You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:10 UTC
[32/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
index 1788dab..d681236 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
@@ -48,26 +48,27 @@ public class ServerCallbackHandler implements CallbackHandler {
private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
private String userName;
- private final Map<String,String> credentials = new HashMap<String,String>();
+ private final Map<String, String> credentials = new HashMap<String, String>();
public ServerCallbackHandler(Configuration configuration) throws IOException {
- if (configuration==null) return;
+ if (configuration == null)
+ return;
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+ String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + "' entry in this configuration: Server cannot start.";
throw new IOException(errorMessage);
}
credentials.clear();
- for(AppConfigurationEntry entry: configurationEntries) {
- Map<String,?> options = entry.getOptions();
+ for (AppConfigurationEntry entry : configurationEntries) {
+ Map<String, ?> options = entry.getOptions();
// Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
// Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
- for(Map.Entry<String, ?> pair : options.entrySet()) {
+ for (Map.Entry<String, ?> pair : options.entrySet()) {
String key = pair.getKey();
if (key.startsWith(USER_PREFIX)) {
String userName = key.substring(USER_PREFIX.length());
- credentials.put(userName,(String)pair.getValue());
+ credentials.put(userName, (String) pair.getValue());
}
}
}
@@ -98,7 +99,7 @@ public class ServerCallbackHandler implements CallbackHandler {
if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
// superuser: use Java system property for password, if available.
pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
- } else if (credentials.containsKey(userName) ) {
+ } else if (credentials.containsKey(userName)) {
pc.setPassword(credentials.get(userName).toCharArray());
} else {
LOG.warn("No password found for user: " + userName);
@@ -106,7 +107,7 @@ public class ServerCallbackHandler implements CallbackHandler {
}
private void handleRealmCallback(RealmCallback rc) {
- LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
+ LOG.debug("handleRealmCallback: " + rc.getDefaultText());
rc.setText(rc.getDefaultText());
}
@@ -114,14 +115,14 @@ public class ServerCallbackHandler implements CallbackHandler {
String authenticationID = ac.getAuthenticationID();
LOG.info("Successfully authenticated client: authenticationID = " + authenticationID + " authorizationID = " + ac.getAuthorizationID());
- //if authorizationId is not set, set it to authenticationId.
- if(ac.getAuthorizationID() == null) {
+ // if authorizationId is not set, set it to authenticationId.
+ if (ac.getAuthorizationID() == null) {
ac.setAuthorizedID(authenticationID);
}
- //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
- //add the authNid as the real user in reqContext's subject which will be used during authorization.
- if(!authenticationID.equals(ac.getAuthorizationID())) {
+ // When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+ // add the authNid as the real user in reqContext's subject which will be used during authorization.
+ if (!authenticationID.equals(ac.getAuthorizationID())) {
LOG.info("Impersonation attempt authenticationID = " + ac.getAuthenticationID() + " authorizationID = " + ac.getAuthorizationID());
ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
index aed1c4f..116febb 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
@@ -63,7 +63,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
private static KerberosTicket getTGT(Subject subject) {
Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
- for(KerberosTicket ticket: tickets) {
+ for (KerberosTicket ticket : tickets) {
KerberosPrincipal server = ticket.getServer();
if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
tickets = null;
@@ -72,26 +72,26 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
}
tickets = null;
return null;
- }
+ }
@Override
public void populateCredentials(Map<String, String> credentials) {
- //Log the user in and get the TGT
+ // Log the user in and get the TGT
try {
Configuration login_conf = AuthUtils.GetConfiguration(conf);
ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
-
- //login our user
- Configuration.setConfiguration(login_conf);
+
+ // login our user
+ Configuration.setConfiguration(login_conf);
LoginContext lc = new LoginContext(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
try {
lc.login();
final Subject subject = lc.getSubject();
KerberosTicket tgt = getTGT(subject);
- if (tgt == null) { //error
- throw new RuntimeException("Fail to verify user principal with section \""
- +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+ if (tgt == null) { // error
+ throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_CLIENT
+ + "\" in login configuration file " + login_conf);
}
if (!tgt.isForwardable()) {
@@ -102,7 +102,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
throw new RuntimeException("The TGT found is not renewable");
}
- LOG.info("Pushing TGT for "+tgt.getClient()+" to topology.");
+ LOG.info("Pushing TGT for " + tgt.getClient() + " to topology.");
saveTGT(tgt, credentials);
} finally {
lc.logout();
@@ -131,7 +131,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
try {
ByteArrayInputStream bin = new ByteArrayInputStream(DatatypeConverter.parseBase64Binary(credentials.get("TGT")));
ObjectInputStream in = new ObjectInputStream(bin);
- ret = (KerberosTicket)in.readObject();
+ ret = (KerberosTicket) in.readObject();
in.close();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -155,16 +155,16 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
KerberosTicket tgt = getTGT(credentials);
if (tgt != null) {
Set<Object> creds = subject.getPrivateCredentials();
- synchronized(creds) {
+ synchronized (creds) {
Iterator<Object> iterator = creds.iterator();
while (iterator.hasNext()) {
Object o = iterator.next();
if (o instanceof KerberosTicket) {
- KerberosTicket t = (KerberosTicket)o;
+ KerberosTicket t = (KerberosTicket) o;
iterator.remove();
try {
t.destroy();
- } catch (DestroyFailedException e) {
+ } catch (DestroyFailedException e) {
LOG.warn("Failed to destory ticket ", e);
}
}
@@ -179,8 +179,8 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
}
/**
- * Hadoop does not just go off of a TGT, it needs a bit more. This
- * should fill in the rest.
+ * Hadoop does not just go off of a TGT, it needs a bit more. This should fill in the rest.
+ *
* @param subject the subject that should have a TGT in it.
*/
private void loginHadoopUser(Subject subject) {
@@ -193,23 +193,21 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
}
try {
Method isSecEnabled = ugi.getMethod("isSecurityEnabled");
- if (!((Boolean)isSecEnabled.invoke(null))) {
- LOG.warn("Hadoop is on the classpath but not configured for " +
- "security, if you want security you need to be sure that " +
- "hadoop.security.authentication=kerberos in core-site.xml " +
- "in your jar");
+ if (!((Boolean) isSecEnabled.invoke(null))) {
+ LOG.warn("Hadoop is on the classpath but not configured for " + "security, if you want security you need to be sure that "
+ + "hadoop.security.authentication=kerberos in core-site.xml " + "in your jar");
return;
}
-
+
try {
Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
login.invoke(null, subject);
} catch (NoSuchMethodException me) {
- //The version of Hadoop does not have the needed client changes.
+ // The version of Hadoop does not have the needed client changes.
// So don't look now, but do something really ugly to work around it.
// This is because we are reaching into the hidden bits of Hadoop security, and it works for now, but may stop at any point in time.
- //We are just trying to do the following
+ // We are just trying to do the following
// Configuration conf = new Configuration();
// HadoopKerberosName.setConfiguration(conf);
// subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null));
@@ -220,7 +218,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
Constructor confCons = confClass.getConstructor();
Object conf = confCons.newInstance();
Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName");
- Method hknSetConf = hknClass.getMethod("setConfiguration",confClass);
+ Method hknSetConf = hknClass.getMethod("setConfiguration", confClass);
hknSetConf.invoke(null, conf);
Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod");
@@ -236,7 +234,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class);
userCons.setAccessible(true);
Object user = userCons.newInstance(name, kerbAuthMethod, null);
- subject.getPrincipals().add((Principal)user);
+ subject.getPrincipals().add((Principal) user);
}
} catch (Exception e) {
LOG.warn("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop may not be compatible.", e);
@@ -250,14 +248,14 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
}
@Override
- public void renew(Map<String,String> credentials, Map topologyConf) {
+ public void renew(Map<String, String> credentials, Map topologyConf) {
KerberosTicket tgt = getTGT(credentials);
if (tgt != null) {
long refreshTime = getRefreshTime(tgt);
long now = System.currentTimeMillis();
if (now >= refreshTime) {
try {
- LOG.info("Renewing TGT for "+tgt.getClient());
+ LOG.info("Renewing TGT for " + tgt.getClient());
tgt.refresh();
saveTGT(tgt, credentials);
} catch (RefreshFailedException e) {
@@ -272,10 +270,10 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
Map conf = new java.util.HashMap();
conf.put("java.security.auth.login.config", args[0]);
at.prepare(conf);
- Map<String,String> creds = new java.util.HashMap<String,String>();
+ Map<String, String> creds = new java.util.HashMap<String, String>();
at.populateCredentials(creds);
Subject s = new Subject();
at.populateSubject(s, creds);
- LOG.info("Got a Subject "+s);
+ LOG.info("Got a Subject " + s);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
index 807abe3..647e240 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
@@ -29,7 +29,6 @@ import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.LoginException;
import javax.security.auth.spi.LoginModule;
-
/**
* Custom LoginModule to enable Auto Login based on cached ticket
*/
@@ -41,10 +40,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule {
protected KerberosTicket kerbTicket = null;
- public void initialize(Subject subject,
- CallbackHandler callbackHandler,
- Map<String, ?> sharedState,
- Map<String, ?> options) {
+ public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
this.subject = subject;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
index ba34fc9..6188566 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
@@ -31,7 +31,7 @@ public class AutoTGTKrb5LoginModuleTest extends AutoTGTKrb5LoginModule {
public void setKerbTicket(KerberosTicket ticket) {
this.kerbTicket = ticket;
}
-
+
@Override
protected void getKerbTicketFromCache() {
// Do nothing.
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
index d46aa8b..13a2cba 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -49,11 +49,11 @@ public class ClientCallbackHandler implements CallbackHandler {
* @throws IOException
*/
public ClientCallbackHandler(Configuration configuration) throws IOException {
- if (configuration == null) return;
+ if (configuration == null)
+ return;
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
- + "' entry in this configuration: Client cannot start.";
+ String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT + "' entry in this configuration: Client cannot start.";
LOG.error(errorMessage);
throw new IOException(errorMessage);
}
@@ -61,7 +61,8 @@ public class ClientCallbackHandler implements CallbackHandler {
/**
* This method is invoked by SASL for authentication challenges
- * @param callbacks a collection of challenge callbacks
+ *
+ * @param callbacks a collection of challenge callbacks
*/
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback c : callbacks) {
@@ -69,20 +70,18 @@ public class ClientCallbackHandler implements CallbackHandler {
LOG.debug("name callback");
} else if (c instanceof PasswordCallback) {
LOG.debug("password callback");
- LOG.warn("Could not login: the client is being asked for a password, but the " +
- " client code does not currently support obtaining a password from the user." +
- " Make sure that the client is configured to use a ticket cache (using" +
- " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
- " you still get this message after that, the TGT in the ticket cache has expired and must" +
- " be manually refreshed. To do so, first determine if you are using a password or a" +
- " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
- " is running this client using the command" +
- " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
- " If the latter, do" +
- " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
- " <keytab> is the location of the keytab file). After manually refreshing your cache," +
- " restart this client. If you continue to see this message after manually refreshing" +
- " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+ LOG.warn("Could not login: the client is being asked for a password, but the "
+ + " client code does not currently support obtaining a password from the user."
+ + " Make sure that the client is configured to use a ticket cache (using"
+ + " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If"
+ + " you still get this message after that, the TGT in the ticket cache has expired and must"
+ + " be manually refreshed. To do so, first determine if you are using a password or a"
+ + " keytab. If the former, run kinit in a Unix shell in the environment of the user who" + " is running this client using the command"
+ + " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." + " If the latter, do"
+ + " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and"
+ + " <keytab> is the location of the keytab file). After manually refreshing your cache,"
+ + " restart this client. If you continue to see this message after manually refreshing"
+ + " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
} else if (c instanceof AuthorizeCallback) {
LOG.debug("authorization callback");
AuthorizeCallback ac = (AuthorizeCallback) c;
@@ -96,7 +95,7 @@ public class ClientCallbackHandler implements CallbackHandler {
if (ac.isAuthorized()) {
ac.setAuthorizedID(authzid);
}
- } else {
+ } else {
throw new UnsupportedCallbackException(c);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index ecb0daf..e257a8a 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -48,19 +48,19 @@ import backtype.storm.security.auth.AuthUtils;
import backtype.storm.security.auth.SaslTransportPlugin;
public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
- public static final String KERBEROS = "GSSAPI";
+ public static final String KERBEROS = "GSSAPI";
private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
public TTransportFactory getServerTransportFactory() throws IOException {
- //create an authentication callback handler
+ // create an authentication callback handler
CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, storm_conf);
-
- //login our principal
+
+ // login our principal
Subject subject = null;
try {
- //specify a configuration object to be used
- Configuration.setConfiguration(login_conf);
- //now login
+ // specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ // now login
Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler);
subject = login.getSubject();
} catch (LoginException ex) {
@@ -68,27 +68,27 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
throw new RuntimeException(ex);
}
- //check the credential of our principal
- if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
- throw new RuntimeException("Fail to verify user principal with section \""
- +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
+ // check the credential of our principal
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file "
+ + login_conf);
}
- String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
- LOG.debug("principal:"+principal);
+ String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
+ LOG.debug("principal:" + principal);
KerberosName serviceKerberosName = new KerberosName(principal);
String serviceName = serviceKerberosName.getServiceName();
String hostName = serviceKerberosName.getHostName();
- Map<String, String> props = new TreeMap<String,String>();
+ Map<String, String> props = new TreeMap<String, String>();
props.put(Sasl.QOP, "auth");
props.put(Sasl.SERVER_AUTH, "false");
- //create a transport factory that will invoke our auth callback for digest
+ // create a transport factory that will invoke our auth callback for digest
TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler);
- //create a wrap transport factory so that we could apply user credential during connections
- TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
+ // create a wrap transport factory so that we could apply user credential during connections
+ TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
LOG.info("SASL GSSAPI transport factory will be used");
return wrapFactory;
@@ -96,55 +96,47 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
@Override
public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
- //create an authentication callback handler
+ // create an authentication callback handler
ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
-
- //login our user
+
+ // login our user
Login login = null;
- try {
- //specify a configuration object to be used
- Configuration.setConfiguration(login_conf);
- //now login
- login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
+ try {
+ // specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ // now login
+ login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
} catch (LoginException ex) {
LOG.error("Server failed to login in principal:" + ex, ex);
throw new RuntimeException(ex);
}
final Subject subject = login.getSubject();
- if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
- throw new RuntimeException("Fail to verify user principal with section \""
- +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { // error
+ throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file "
+ + login_conf);
}
final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
if (serviceName == null) {
- serviceName = AuthUtils.SERVICE;
+ serviceName = AuthUtils.SERVICE;
}
- Map<String, String> props = new TreeMap<String,String>();
+ Map<String, String> props = new TreeMap<String, String>();
props.put(Sasl.QOP, "auth");
props.put(Sasl.SERVER_AUTH, "false");
LOG.debug("SASL GSSAPI client transport is being established");
- final TTransport sasalTransport = new TSaslClientTransport(KERBEROS,
- principal,
- serviceName,
- serverHost,
- props,
- null,
- transport);
-
- //open Sasl transport with the login credential
+ final TTransport sasalTransport = new TSaslClientTransport(KERBEROS, principal, serviceName, serverHost, props, null, transport);
+
+ // open Sasl transport with the login credential
try {
- Subject.doAs(subject,
- new PrivilegedExceptionAction<Void>() {
+ Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
public Void run() {
try {
- LOG.debug("do as:"+ principal);
+ LOG.debug("do as:" + principal);
sasalTransport.open();
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e);
}
return null;
@@ -158,19 +150,18 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
}
private String getPrincipal(Subject subject) {
- Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
- if (principals==null || principals.size()<1) {
+ Set<Principal> principals = (Set<Principal>) subject.getPrincipals();
+ if (principals == null || principals.size() < 1) {
LOG.info("No principal found in login subject");
return null;
}
- return ((Principal)(principals.toArray()[0])).getName();
+ return ((Principal) (principals.toArray()[0])).getName();
}
- /** A TransportFactory that wraps another one, but assumes a specified UGI
- * before calling through.
- *
- * This is used on the server side to assume the server's Principal when accepting
- * clients.
+ /**
+ * A TransportFactory that wraps another one, but assumes a specified UGI before calling through.
+ *
+ * This is used on the server side to assume the server's Principal when accepting clients.
*/
static class TUGIAssumingTransportFactory extends TTransportFactory {
private final Subject subject;
@@ -180,21 +171,19 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
this.wrapped = wrapped;
this.subject = subject;
- Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
- if (principals.size()>0)
- LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
+ Set<Principal> principals = (Set<Principal>) subject.getPrincipals();
+ if (principals.size() > 0)
+ LOG.info("Service principal:" + ((Principal) (principals.toArray()[0])).getName());
}
@Override
public TTransport getTransport(final TTransport trans) {
try {
- return Subject.doAs(subject,
- new PrivilegedExceptionAction<TTransport>() {
+ return Subject.doAs(subject, new PrivilegedExceptionAction<TTransport>() {
public TTransport run() {
try {
return wrapped.getTransport(trans);
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
return null;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
index 7b143f0..0e32e0b 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -41,11 +41,12 @@ public class ServerCallbackHandler implements CallbackHandler {
private String userName;
public ServerCallbackHandler(Configuration configuration, Map stormConf) throws IOException {
- if (configuration==null) return;
+ if (configuration == null)
+ return;
AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
if (configurationEntries == null) {
- String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+ String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + "' entry in this configuration: Server cannot start.";
LOG.error(errorMessage);
throw new IOException(errorMessage);
}
@@ -78,14 +79,14 @@ public class ServerCallbackHandler implements CallbackHandler {
String authenticationID = ac.getAuthenticationID();
LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
- //if authorizationId is not set, set it to authenticationId.
- if(ac.getAuthorizationID() == null) {
+ // if authorizationId is not set, set it to authenticationId.
+ if (ac.getAuthorizationID() == null) {
ac.setAuthorizedID(authenticationID);
}
- //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
- //add the authNid as the real user in reqContext's subject which will be used during authorization.
- if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+ // When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+ // add the authNid as the real user in reqContext's subject which will be used during authorization.
+ if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
index 8e66cdf..437cdbb 100644
--- a/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
@@ -40,8 +40,9 @@ import backtype.storm.Config;
*/
public class BlowfishTupleSerializer extends Serializer<ListDelegate> {
/**
- * The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory).
- * You should use in via "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ...".
+ * The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory). You should use in via
+ * "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ..."
+ * .
*/
public static String SECRET_KEY = "topology.tuple.serializer.blowfish.key";
private static final Logger LOG = LoggerFactory.getLogger(BlowfishTupleSerializer.class);
@@ -50,12 +51,12 @@ public class BlowfishTupleSerializer extends Serializer<ListDelegate> {
public BlowfishTupleSerializer(Kryo kryo, Map storm_conf) {
String encryption_key = null;
try {
- encryption_key = (String)storm_conf.get(SECRET_KEY);
+ encryption_key = (String) storm_conf.get(SECRET_KEY);
LOG.debug("Blowfish serializer being constructed ...");
if (encryption_key == null) {
throw new RuntimeException("Blowfish encryption key not specified");
}
- byte[] bytes = Hex.decodeHex(encryption_key.toCharArray());
+ byte[] bytes = Hex.decodeHex(encryption_key.toCharArray());
_serializer = new BlowfishSerializer(new ListDelegateSerializer(), bytes);
} catch (org.apache.commons.codec.DecoderException ex) {
throw new RuntimeException("Blowfish encryption key invalid", ex);
@@ -69,22 +70,23 @@ public class BlowfishTupleSerializer extends Serializer<ListDelegate> {
@Override
public ListDelegate read(Kryo kryo, Input input, Class<ListDelegate> type) {
- return (ListDelegate)_serializer.read(kryo, input, type);
+ return (ListDelegate) _serializer.read(kryo, input, type);
}
/**
* Produce a blowfish key to be used in "Storm jar" command
*/
public static void main(String[] args) {
- try{
+ try {
KeyGenerator kgen = KeyGenerator.getInstance("Blowfish");
SecretKey skey = kgen.generateKey();
byte[] raw = skey.getEncoded();
String keyString = new String(Hex.encodeHex(raw));
- System.out.println("storm -c "+SECRET_KEY+"="+keyString+" -c "+Config.TOPOLOGY_TUPLE_SERIALIZER+"="+BlowfishTupleSerializer.class.getName() + " ..." );
+ System.out.println("storm -c " + SECRET_KEY + "=" + keyString + " -c " + Config.TOPOLOGY_TUPLE_SERIALIZER + "="
+ + BlowfishTupleSerializer.class.getName() + " ...");
} catch (Exception ex) {
LOG.error(ex.getMessage());
ex.printStackTrace();
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
index a055eb2..91e629a 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
@@ -22,30 +22,29 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import java.util.Map;
-
public class DefaultKryoFactory implements IKryoFactory {
public static class KryoSerializableDefault extends Kryo {
boolean _override = false;
-
+
public void overrideDefault(boolean value) {
_override = value;
- }
-
+ }
+
@Override
public Serializer getDefaultSerializer(Class type) {
- if(_override) {
+ if (_override) {
return new SerializableSerializer();
} else {
return super.getDefaultSerializer(type);
}
- }
- }
-
+ }
+ }
+
@Override
public Kryo getKryo(Map conf) {
KryoSerializableDefault k = new KryoSerializableDefault();
- k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)));
+ k.setRegistrationRequired((Boolean) conf.get(Config.TOPOLOGY_KRYO_REGISTER_REQUIRED));
k.setReferences(false);
return k;
}
@@ -53,12 +52,12 @@ public class DefaultKryoFactory implements IKryoFactory {
@Override
public void preRegister(Kryo k, Map conf) {
}
-
+
public void postRegister(Kryo k, Map conf) {
- ((KryoSerializableDefault)k).overrideDefault(true);
+ ((KryoSerializableDefault) k).overrideDefault((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION));
}
@Override
- public void postDecorate(Kryo k, Map conf) {
- }
+ public void postDecorate(Kryo k, Map conf) {
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
index 6d986af..c97470f 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
@@ -48,10 +48,10 @@ public class DefaultSerializationDelegate implements SerializationDelegate {
ObjectInputStream ois = new ObjectInputStream(bis);
Object ret = ois.readObject();
ois.close();
- return (T)ret;
- } catch(IOException ioe) {
+ return (T) ret;
+ } catch (IOException ioe) {
throw new RuntimeException(ioe);
- } catch(ClassNotFoundException e) {
+ } catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
index c8377c3..4b7951e 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
@@ -22,8 +22,8 @@ import java.util.zip.GZIPInputStream;
/**
* Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes with gzip. If not, uses
- * {@link backtype.storm.serialization.DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled
- * via {@link #prepare(java.util.Map)} is passed through to both delegates.
+ * {@link DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled via {@link #prepare(Map)} is
+ * passed through to both delegates.
*/
@Deprecated
public class GzipBridgeSerializationDelegate implements SerializationDelegate {
@@ -47,7 +47,7 @@ public class GzipBridgeSerializationDelegate implements SerializationDelegate {
if (isGzipped(bytes)) {
return gzipDelegate.deserialize(bytes, clazz);
} else {
- return defaultDelegate.deserialize(bytes,clazz);
+ return defaultDelegate.deserialize(bytes, clazz);
}
}
@@ -59,7 +59,6 @@ public class GzipBridgeSerializationDelegate implements SerializationDelegate {
* Looks ahead to see if the GZIP magic constant is heading {@code bytes}
*/
private boolean isGzipped(byte[] bytes) {
- return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE)
- && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
+ return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
index e5e77c3..6d580db 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
@@ -22,8 +22,8 @@ import java.util.zip.GZIPInputStream;
/**
* Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes with gzip. If not, uses
- * {@link backtype.storm.serialization.ThriftSerializationDelegate} to deserialize. Any logic needing to be enabled
- * via {@link #prepare(java.util.Map)} is passed through to both delegates.
+ * {@link ThriftSerializationDelegate} to deserialize. Any logic needing to be enabled via {@link #prepare(Map)} is
+ * passed through to both delegates.
*/
public class GzipBridgeThriftSerializationDelegate implements SerializationDelegate {
@@ -46,7 +46,7 @@ public class GzipBridgeThriftSerializationDelegate implements SerializationDeleg
if (isGzipped(bytes)) {
return gzipDelegate.deserialize(bytes, clazz);
} else {
- return defaultDelegate.deserialize(bytes,clazz);
+ return defaultDelegate.deserialize(bytes, clazz);
}
}
@@ -58,7 +58,6 @@ public class GzipBridgeThriftSerializationDelegate implements SerializationDeleg
* Looks ahead to see if the GZIP magic constant is heading {@code bytes}
*/
private boolean isGzipped(byte[] bytes) {
- return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE)
- && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
+ return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
index 3c8ee8b..2b27af0 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
@@ -54,10 +54,10 @@ public class GzipSerializationDelegate implements SerializationDelegate {
ObjectInputStream ois = new ObjectInputStream(gis);
Object ret = ois.readObject();
ois.close();
- return (T)ret;
- } catch(IOException ioe) {
+ return (T) ret;
+ } catch (IOException ioe) {
throw new RuntimeException(ioe);
- } catch(ClassNotFoundException e) {
+ } catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
index 933a125..a76c080 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
@@ -49,7 +49,7 @@ public class GzipThriftSerializationDelegate implements SerializationDelegate {
try {
TBase instance = (TBase) clazz.newInstance();
new TDeserializer().deserialize(instance, Utils.gunzip(bytes));
- return (T)instance;
+ return (T) instance;
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
index b154a36..36e59a5 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
package backtype.storm.serialization;
+
import com.esotericsoftware.kryo.Kryo;
public interface IKryoDecorator {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
index 60a847d..b5c4522 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
@@ -21,20 +21,18 @@ import com.esotericsoftware.kryo.Kryo;
import java.util.Map;
/**
- * An interface that controls the Kryo instance used by Storm for serialization.
- * The lifecycle is:
+ * An interface that controls the Kryo instance used by Storm for serialization. The lifecycle is:
*
- * 1. The Kryo instance is constructed using getKryo
- * 2. Storm registers the default classes (e.g. arrays, lists, maps, etc.)
- * 3. Storm calls preRegister hook
- * 4. Storm registers all user-defined registrations through topology.kryo.register
- * 5. Storm calls postRegister hook
- * 6. Storm calls all user-defined decorators through topology.kryo.decorators
- * 7. Storm calls postDecorate hook
+ * 1. The Kryo instance is constructed using getKryo 2. Storm registers the default classes (e.g. arrays, lists, maps, etc.) 3. Storm calls preRegister hook 4.
+ * Storm registers all user-defined registrations through topology.kryo.register 5. Storm calls postRegister hook 6. Storm calls all user-defined decorators
+ * through topology.kryo.decorators 7. Storm calls postDecorate hook
*/
public interface IKryoFactory {
Kryo getKryo(Map conf);
+
void preRegister(Kryo k, Map conf);
+
void postRegister(Kryo k, Map conf);
+
void postDecorate(Kryo k, Map conf);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
index 4e68658..641a472 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
@@ -21,5 +21,5 @@ import backtype.storm.tuple.Tuple;
import java.io.IOException;
public interface ITupleDeserializer {
- Tuple deserialize(byte[] ser);
+ Tuple deserialize(byte[] ser);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
index 90ad932..68df8bf 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
@@ -19,8 +19,7 @@ package backtype.storm.serialization;
import backtype.storm.tuple.Tuple;
-
public interface ITupleSerializer {
byte[] serialize(Tuple tuple);
-// long crc32(Tuple tuple);
+ // long crc32(Tuple tuple);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
index 3496e68..bb8bcb4 100644
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
@@ -40,55 +40,65 @@ import java.util.Map;
public class KryoTupleDeserializer implements ITupleDeserializer {
private static final Logger LOG = LoggerFactory.getLogger(KryoTupleDeserializer.class);
-
+
public static final boolean USE_RAW_PACKET = true;
-
+
GeneralTopologyContext _context;
KryoValuesDeserializer _kryo;
SerializationFactory.IdDictionary _ids;
Input _kryoInput;
-
+
public KryoTupleDeserializer(final Map conf, final GeneralTopologyContext context) {
_kryo = new KryoValuesDeserializer(conf);
_context = context;
_ids = new SerializationFactory.IdDictionary(context.getRawTopology());
_kryoInput = new Input(1);
}
-
+
public Tuple deserialize(byte[] ser) {
-
+ _kryoInput.setBuffer(ser);
+ return deserialize(_kryoInput);
+ }
+
+ public Tuple deserialize(byte[] ser, int offset, int count) {
+ _kryoInput.setBuffer(ser, offset, count);
+ return deserialize(_kryoInput);
+ }
+
+ public Tuple deserialize(Input input) {
int targetTaskId = 0;
+ long timeStamp = 0l;
int taskId = 0;
int streamId = 0;
String componentName = null;
String streamName = null;
MessageId id = null;
-
+
try {
-
- _kryoInput.setBuffer(ser);
-
- targetTaskId = _kryoInput.readInt();
- taskId = _kryoInput.readInt(true);
- streamId = _kryoInput.readInt(true);
+ targetTaskId = input.readInt();
+ timeStamp = input.readLong();
+ taskId = input.readInt(true);
+ streamId = input.readInt(true);
componentName = _context.getComponentId(taskId);
streamName = _ids.getStreamName(componentName, streamId);
- id = MessageId.deserialize(_kryoInput);
- List<Object> values = _kryo.deserializeFrom(_kryoInput);
+ id = MessageId.deserialize(input);
+ List<Object> values = _kryo.deserializeFrom(input);
TupleImplExt tuple = new TupleImplExt(_context, values, taskId, streamName, id);
tuple.setTargetTaskId(targetTaskId);
+ tuple.setCreationTimeStamp(timeStamp);
return tuple;
} catch (Throwable e) {
StringBuilder sb = new StringBuilder();
-
+
sb.append("Deserialize error:");
sb.append("targetTaskId:").append(targetTaskId);
+ sb.append(",creationTimeStamp:").append(timeStamp);
sb.append(",taskId:").append(taskId);
sb.append(",streamId:").append(streamId);
sb.append(",componentName:").append(componentName);
sb.append(",streamName:").append(streamName);
sb.append(",MessageId").append(id);
-
+
LOG.info(sb.toString(), e);
throw new RuntimeException(e);
}
@@ -99,15 +109,14 @@ public class KryoTupleDeserializer implements ITupleDeserializer {
int offset = 0;
while(offset < ser.length) {
- int tupleSize = Utils.readIntFromByteArray(ser, offset);
+ _kryoInput.setBuffer(ser, offset, offset + 4);
+ int tupleSize = _kryoInput.readInt();
offset += 4;
- ByteBuffer buff = ByteBuffer.allocate(tupleSize);
- buff.put(ser, offset, tupleSize);
- ret.addToBatch(deserialize(buff.array()));
+ ret.addToBatch(deserialize(ser, offset, offset + tupleSize));
offset += tupleSize;
}
-
+
return ret;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
index 1c53d5d..e49e58b 100644
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
@@ -33,30 +33,35 @@ public class KryoTupleSerializer implements ITupleSerializer {
KryoValuesSerializer _kryo;
SerializationFactory.IdDictionary _ids;
Output _kryoOut;
-
+
public KryoTupleSerializer(final Map conf, final GeneralTopologyContext context) {
_kryo = new KryoValuesSerializer(conf);
_kryoOut = new Output(2000, 2000000000);
_ids = new SerializationFactory.IdDictionary(context.getRawTopology());
}
-
+
+ public byte[] serialize(Tuple tuple) {
+ _kryoOut.clear();
+ serializeTuple(_kryoOut, tuple);
+ return _kryoOut.toBytes();
+ }
/**
* @@@ in the furture, it will skill serialize 'targetTask' through check some flag
- * @see backtype.storm.serialization.ITupleSerializer#serialize(int, backtype.storm.tuple.Tuple)
+ * @see ITupleSerializer#serialize(int, Tuple)
*/
- public byte[] serialize(Tuple tuple) {
+ private void serializeTuple(Output output, Tuple tuple) {
try {
-
- _kryoOut.clear();
if (tuple instanceof TupleExt) {
- _kryoOut.writeInt(((TupleExt) tuple).getTargetTaskId());
+ output.writeInt(((TupleExt) tuple).getTargetTaskId());
+ output.writeLong(((TupleExt) tuple).getCreationTimeStamp());
}
-
- _kryoOut.writeInt(tuple.getSourceTask(), true);
- _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
- tuple.getMessageId().serialize(_kryoOut);
- _kryo.serializeInto(tuple.getValues(), _kryoOut);
- return _kryoOut.toBytes();
+
+ output.writeInt(tuple.getSourceTask(), true);
+ output.writeInt(
+ _ids.getStreamId(tuple.getSourceComponent(),
+ tuple.getSourceStreamId()), true);
+ tuple.getMessageId().serialize(output);
+ _kryo.serializeInto(tuple.getValues(), output);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -66,31 +71,28 @@ public class KryoTupleSerializer implements ITupleSerializer {
if (batch == null || batch.currBatchSize() == 0)
return null;
- byte[][] bytes = new byte[batch.currBatchSize()][];
- int i = 0, len = 0;
+ _kryoOut.clear();
for (Tuple tuple : batch.getTuples()) {
- /* byte structure:
+ /*
+ * byte structure:
* 1st tuple: length + tuple bytes
* 2nd tuple: length + tuple bytes
* ......
*/
- bytes[i] = serialize(tuple);
- len += bytes[i].length;
- // add length bytes (int)
- len += 4;
- i++;
- }
-
- byte[] ret = new byte[len];
- int index = 0;
- for (i = 0; i < bytes.length; i++) {
- Utils.writeIntToByteArray(ret, index, bytes[i].length);
- index += 4;
- for (int j = 0; j < bytes[i].length; j++) {
- ret[index++] = bytes[i][j];
- }
+ int startPos = _kryoOut.position();
+
+ // Set initial value of tuple length, which will be updated accordingly after serialization
+ _kryoOut.writeInt(0);
+
+ serializeTuple(_kryoOut, tuple);
+
+ // Update the tuple length
+ int endPos = _kryoOut.position();
+ _kryoOut.setPosition(startPos);
+ _kryoOut.writeInt(endPos - startPos - 4);
+ _kryoOut.setPosition(endPos);
}
- return ret;
+ return _kryoOut.toBytes();
}
public static byte[] serialize(int targetTask) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
index 209ae53..45a7376 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
@@ -28,22 +28,22 @@ import java.util.Map;
public class KryoValuesDeserializer {
Kryo _kryo;
Input _kryoInput;
-
+
public KryoValuesDeserializer(Map conf) {
_kryo = SerializationFactory.getKryo(conf);
_kryoInput = new Input(1);
}
-
+
public List<Object> deserializeFrom(Input input) {
- ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
- return delegate.getDelegate();
+ ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
+ return delegate.getDelegate();
}
-
+
public List<Object> deserialize(byte[] ser) throws IOException {
_kryoInput.setBuffer(ser);
return deserializeFrom(_kryoInput);
}
-
+
public Object deserializeObject(byte[] ser) throws IOException {
_kryoInput.setBuffer(ser);
return _kryo.readClassAndObject(_kryoInput);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
index c4a2f71..d53f1bd 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
@@ -28,28 +28,28 @@ public class KryoValuesSerializer {
Kryo _kryo;
ListDelegate _delegate;
Output _kryoOut;
-
+
public KryoValuesSerializer(Map conf) {
_kryo = SerializationFactory.getKryo(conf);
_delegate = new ListDelegate();
_kryoOut = new Output(2000, 2000000000);
}
-
+
public void serializeInto(List<Object> values, Output out) throws IOException {
// this ensures that list of values is always written the same way, regardless
- // of whether it's a java collection or one of clojure's persistent collections
+ // of whether it's a java collection or one of clojure's persistent collections
// (which have different serializers)
// Doing this lets us deserialize as ArrayList and avoid writing the class here
_delegate.setDelegate(values);
- _kryo.writeObject(out, _delegate);
+ _kryo.writeObject(out, _delegate);
}
-
+
public byte[] serialize(List<Object> values) throws IOException {
_kryoOut.clear();
serializeInto(values, _kryoOut);
return _kryoOut.toBytes();
}
-
+
public byte[] serializeObject(Object obj) {
_kryoOut.clear();
_kryo.writeClassAndObject(_kryoOut, obj);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
index 376ad2a..b60e8b8 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
@@ -30,7 +30,7 @@ import java.io.ObjectOutputStream;
import org.apache.commons.io.input.ClassLoaderObjectInputStream;
public class SerializableSerializer extends Serializer<Object> {
-
+
@Override
public void write(Kryo kryo, Output output, Object object) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -45,7 +45,7 @@ public class SerializableSerializer extends Serializer<Object> {
output.writeInt(ser.length);
output.writeBytes(ser);
}
-
+
@Override
public Object read(Kryo kryo, Input input, Class c) {
int len = input.readInt();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
index ef859be..ebf6158 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.types.ArrayListSerializer;
-import backtype.storm.serialization.types.ListDelegateSerializer;
import backtype.storm.serialization.types.HashMapSerializer;
import backtype.storm.serialization.types.HashSetSerializer;
import backtype.storm.transactional.TransactionAttempt;
@@ -33,27 +32,22 @@ import carbonite.JavaBridge;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.serializers.DefaultSerializers.BigIntegerSerializer;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.math.BigInteger;
+import java.util.*;
+
public class SerializationFactory {
public static final Logger LOG = LoggerFactory.getLogger(SerializationFactory.class);
-
+
public static Kryo getKryo(Map conf) {
IKryoFactory kryoFactory = (IKryoFactory) Utils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
Kryo k = kryoFactory.getKryo(conf);
if (WorkerClassLoader.getInstance() != null)
k.setClassLoader(WorkerClassLoader.getInstance());
k.register(byte[].class);
-
+
/* tuple payload serializer is specified via configuration */
String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
try {
@@ -63,7 +57,7 @@ public class SerializationFactory {
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
-
+
k.register(ArrayList.class, new ArrayListSerializer());
k.register(HashMap.class, new HashMapSerializer());
k.register(HashSet.class, new HashSetSerializer());
@@ -78,17 +72,17 @@ public class SerializationFactory {
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
Map<String, String> registrations = normalizeKryoRegister(conf);
-
+
kryoFactory.preRegister(k, conf);
-
+
boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);
for (String klassName : registrations.keySet()) {
String serializerClassName = registrations.get(klassName);
try {
Class klass = Class.forName(klassName, true, k.getClassLoader());
-
+
Class serializerClass = null;
if (serializerClassName != null)
serializerClass = Class.forName(serializerClassName, true, k.getClassLoader());
@@ -105,9 +99,9 @@ public class SerializationFactory {
}
}
}
-
+
kryoFactory.postRegister(k, conf);
-
+
if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
try {
@@ -127,21 +121,21 @@ public class SerializationFactory {
}
}
}
-
+
kryoFactory.postDecorate(k, conf);
-
+
return k;
}
-
+
public static class IdDictionary {
Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>();
Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>();
-
+
public IdDictionary(StormTopology topology) {
List<String> componentNames = new ArrayList<String>(topology.get_spouts().keySet());
componentNames.addAll(topology.get_bolts().keySet());
componentNames.addAll(topology.get_state_spouts().keySet());
-
+
for (String name : componentNames) {
ComponentCommon common = Utils.getComponentCommon(topology, name);
List<String> streams = new ArrayList<String>(common.get_streams().keySet());
@@ -149,15 +143,15 @@ public class SerializationFactory {
streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name)));
}
}
-
+
public int getStreamId(String component, String stream) {
return streamNametoId.get(component).get(stream);
}
-
+
public String getStreamName(String component, int stream) {
return streamIdToName.get(component).get(stream);
}
-
+
private static Map<String, Integer> idify(List<String> names) {
Collections.sort(names);
Map<String, Integer> ret = new HashMap<String, Integer>();
@@ -169,7 +163,7 @@ public class SerializationFactory {
return ret;
}
}
-
+
private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass, Map conf) {
try {
try {
@@ -201,7 +195,7 @@ public class SerializationFactory {
throw new IllegalArgumentException("Unable to create serializer \"" + serializerClass.getName() + "\" for class: " + superClass.getName(), ex);
}
}
-
+
private static Map<String, String> normalizeKryoRegister(Map conf) {
// TODO: de-duplicate this logic with the code in nimbus
Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER);
@@ -219,7 +213,7 @@ public class SerializationFactory {
}
}
}
-
+
// ensure always same order for registrations with TreeMap
return new TreeMap<String, String>(ret);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
index f5d03e4..ba37614 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
@@ -33,7 +33,7 @@ public class ThriftSerializationDelegate implements SerializationDelegate {
@Override
public byte[] serialize(Object object) {
try {
- return new TSerializer().serialize((TBase) object);
+ return new TSerializer().serialize((TBase) object);
} catch (TException e) {
throw new RuntimeException(e);
}
@@ -44,7 +44,7 @@ public class ThriftSerializationDelegate implements SerializationDelegate {
try {
TBase instance = (TBase) clazz.newInstance();
new TDeserializer().deserialize(instance, bytes);
- return (T)instance;
+ return (T) instance;
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
index 6b7e308..a4bac2f 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
@@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import java.util.ArrayList;
import java.util.Collection;
-
public class ArrayListSerializer extends CollectionSerializer {
@Override
public Collection create(Kryo kryo, Input input, Class<Collection> type) {
return new ArrayList();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
index 662211b..00af80d 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
@@ -23,7 +23,6 @@ import com.esotericsoftware.kryo.serializers.MapSerializer;
import java.util.HashMap;
import java.util.Map;
-
public class HashMapSerializer extends MapSerializer {
@Override
public Map create(Kryo kryo, Input input, Class<Map> type) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
index 77fc353..eb3aab0 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
@@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import java.util.Collection;
import java.util.HashSet;
-
public class HashSetSerializer extends CollectionSerializer {
@Override
public Collection create(Kryo kryo, Input input, Class<Collection> type) {
return new HashSet();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
index c71a19d..c65f16a 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
@@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import backtype.storm.utils.ListDelegate;
import java.util.Collection;
-
public class ListDelegateSerializer extends CollectionSerializer {
@Override
public Collection create(Kryo kryo, Input input, Class<Collection> type) {
return new ListDelegate();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java b/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
index 5999fbb..9b837ba 100755
--- a/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
@@ -18,6 +18,7 @@
package backtype.storm.spout;
public interface IMultiSchemableSpout {
- MultiScheme getScheme();
- void setScheme(MultiScheme scheme);
+ MultiScheme getScheme();
+
+ void setScheme(MultiScheme scheme);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java b/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
index df455d9..7eca980 100755
--- a/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
@@ -17,8 +17,8 @@
*/
package backtype.storm.spout;
-
public interface ISchemableSpout {
- Scheme getScheme();
- void setScheme(Scheme scheme);
+ Scheme getScheme();
+
+ void setScheme(Scheme scheme);
}