You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:10 UTC

[32/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
index 1788dab..d681236 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/digest/ServerCallbackHandler.java
@@ -48,26 +48,27 @@ public class ServerCallbackHandler implements CallbackHandler {
     private static final String SYSPROP_SUPER_PASSWORD = "storm.SASLAuthenticationProvider.superPassword";
 
     private String userName;
-    private final Map<String,String> credentials = new HashMap<String,String>();
+    private final Map<String, String> credentials = new HashMap<String, String>();
 
     public ServerCallbackHandler(Configuration configuration) throws IOException {
-        if (configuration==null) return;
+        if (configuration == null)
+            return;
 
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+            String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + "' entry in this configuration: Server cannot start.";
             throw new IOException(errorMessage);
         }
         credentials.clear();
-        for(AppConfigurationEntry entry: configurationEntries) {
-            Map<String,?> options = entry.getOptions();
+        for (AppConfigurationEntry entry : configurationEntries) {
+            Map<String, ?> options = entry.getOptions();
             // Populate DIGEST-MD5 user -> password map with JAAS configuration entries from the "Server" section.
             // Usernames are distinguished from other options by prefixing the username with a "user_" prefix.
-            for(Map.Entry<String, ?> pair : options.entrySet()) {
+            for (Map.Entry<String, ?> pair : options.entrySet()) {
                 String key = pair.getKey();
                 if (key.startsWith(USER_PREFIX)) {
                     String userName = key.substring(USER_PREFIX.length());
-                    credentials.put(userName,(String)pair.getValue());
+                    credentials.put(userName, (String) pair.getValue());
                 }
             }
         }
@@ -98,7 +99,7 @@ public class ServerCallbackHandler implements CallbackHandler {
         if ("super".equals(this.userName) && System.getProperty(SYSPROP_SUPER_PASSWORD) != null) {
             // superuser: use Java system property for password, if available.
             pc.setPassword(System.getProperty(SYSPROP_SUPER_PASSWORD).toCharArray());
-        } else if (credentials.containsKey(userName) ) {
+        } else if (credentials.containsKey(userName)) {
             pc.setPassword(credentials.get(userName).toCharArray());
         } else {
             LOG.warn("No password found for user: " + userName);
@@ -106,7 +107,7 @@ public class ServerCallbackHandler implements CallbackHandler {
     }
 
     private void handleRealmCallback(RealmCallback rc) {
-        LOG.debug("handleRealmCallback: "+ rc.getDefaultText());
+        LOG.debug("handleRealmCallback: " + rc.getDefaultText());
         rc.setText(rc.getDefaultText());
     }
 
@@ -114,14 +115,14 @@ public class ServerCallbackHandler implements CallbackHandler {
         String authenticationID = ac.getAuthenticationID();
         LOG.info("Successfully authenticated client: authenticationID = " + authenticationID + " authorizationID = " + ac.getAuthorizationID());
 
-        //if authorizationId is not set, set it to authenticationId.
-        if(ac.getAuthorizationID() == null) {
+        // if authorizationId is not set, set it to authenticationId.
+        if (ac.getAuthorizationID() == null) {
             ac.setAuthorizedID(authenticationID);
         }
 
-        //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
-        //add the authNid as the real user in reqContext's subject which will be used during authorization.
-        if(!authenticationID.equals(ac.getAuthorizationID())) {
+        // When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+        // add the authNid as the real user in reqContext's subject which will be used during authorization.
+        if (!authenticationID.equals(ac.getAuthorizationID())) {
             LOG.info("Impersonation attempt  authenticationID = " + ac.getAuthenticationID() + " authorizationID = " + ac.getAuthorizationID());
             ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
index aed1c4f..116febb 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGT.java
@@ -63,7 +63,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
 
     private static KerberosTicket getTGT(Subject subject) {
         Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
-        for(KerberosTicket ticket: tickets) {
+        for (KerberosTicket ticket : tickets) {
             KerberosPrincipal server = ticket.getServer();
             if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
                 tickets = null;
@@ -72,26 +72,26 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
         }
         tickets = null;
         return null;
-    } 
+    }
 
     @Override
     public void populateCredentials(Map<String, String> credentials) {
-        //Log the user in and get the TGT
+        // Log the user in and get the TGT
         try {
             Configuration login_conf = AuthUtils.GetConfiguration(conf);
             ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
-        
-            //login our user
-            Configuration.setConfiguration(login_conf); 
+
+            // login our user
+            Configuration.setConfiguration(login_conf);
             LoginContext lc = new LoginContext(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
             try {
                 lc.login();
                 final Subject subject = lc.getSubject();
                 KerberosTicket tgt = getTGT(subject);
 
-                if (tgt == null) { //error
-                    throw new RuntimeException("Fail to verify user principal with section \""
-                            +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+                if (tgt == null) { // error
+                    throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_CLIENT
+                            + "\" in login configuration file " + login_conf);
                 }
 
                 if (!tgt.isForwardable()) {
@@ -102,7 +102,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
                     throw new RuntimeException("The TGT found is not renewable");
                 }
 
-                LOG.info("Pushing TGT for "+tgt.getClient()+" to topology.");
+                LOG.info("Pushing TGT for " + tgt.getClient() + " to topology.");
                 saveTGT(tgt, credentials);
             } finally {
                 lc.logout();
@@ -131,7 +131,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
             try {
                 ByteArrayInputStream bin = new ByteArrayInputStream(DatatypeConverter.parseBase64Binary(credentials.get("TGT")));
                 ObjectInputStream in = new ObjectInputStream(bin);
-                ret = (KerberosTicket)in.readObject();
+                ret = (KerberosTicket) in.readObject();
                 in.close();
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -155,16 +155,16 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
         KerberosTicket tgt = getTGT(credentials);
         if (tgt != null) {
             Set<Object> creds = subject.getPrivateCredentials();
-            synchronized(creds) {
+            synchronized (creds) {
                 Iterator<Object> iterator = creds.iterator();
                 while (iterator.hasNext()) {
                     Object o = iterator.next();
                     if (o instanceof KerberosTicket) {
-                        KerberosTicket t = (KerberosTicket)o;
+                        KerberosTicket t = (KerberosTicket) o;
                         iterator.remove();
                         try {
                             t.destroy();
-                        } catch (DestroyFailedException  e) {
+                        } catch (DestroyFailedException e) {
                             LOG.warn("Failed to destory ticket ", e);
                         }
                     }
@@ -179,8 +179,8 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
     }
 
     /**
-     * Hadoop does not just go off of a TGT, it needs a bit more.  This
-     * should fill in the rest.
+     * Hadoop does not just go off of a TGT, it needs a bit more. This should fill in the rest.
+     * 
      * @param subject the subject that should have a TGT in it.
      */
     private void loginHadoopUser(Subject subject) {
@@ -193,23 +193,21 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
         }
         try {
             Method isSecEnabled = ugi.getMethod("isSecurityEnabled");
-            if (!((Boolean)isSecEnabled.invoke(null))) {
-                LOG.warn("Hadoop is on the classpath but not configured for " +
-                  "security, if you want security you need to be sure that " +
-                  "hadoop.security.authentication=kerberos in core-site.xml " +
-                  "in your jar");
+            if (!((Boolean) isSecEnabled.invoke(null))) {
+                LOG.warn("Hadoop is on the classpath but not configured for " + "security, if you want security you need to be sure that "
+                        + "hadoop.security.authentication=kerberos in core-site.xml " + "in your jar");
                 return;
             }
- 
+
             try {
                 Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
                 login.invoke(null, subject);
             } catch (NoSuchMethodException me) {
-                //The version of Hadoop does not have the needed client changes.
+                // The version of Hadoop does not have the needed client changes.
                 // So don't look now, but do something really ugly to work around it.
                 // This is because we are reaching into the hidden bits of Hadoop security, and it works for now, but may stop at any point in time.
 
-                //We are just trying to do the following
+                // We are just trying to do the following
                 // Configuration conf = new Configuration();
                 // HadoopKerberosName.setConfiguration(conf);
                 // subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null));
@@ -220,7 +218,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
                 Constructor confCons = confClass.getConstructor();
                 Object conf = confCons.newInstance();
                 Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName");
-                Method hknSetConf = hknClass.getMethod("setConfiguration",confClass);
+                Method hknSetConf = hknClass.getMethod("setConfiguration", confClass);
                 hknSetConf.invoke(null, conf);
 
                 Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod");
@@ -236,7 +234,7 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
                 Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class);
                 userCons.setAccessible(true);
                 Object user = userCons.newInstance(name, kerbAuthMethod, null);
-                subject.getPrincipals().add((Principal)user);
+                subject.getPrincipals().add((Principal) user);
             }
         } catch (Exception e) {
             LOG.warn("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop may not be compatible.", e);
@@ -250,14 +248,14 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
     }
 
     @Override
-    public void renew(Map<String,String> credentials, Map topologyConf) {
+    public void renew(Map<String, String> credentials, Map topologyConf) {
         KerberosTicket tgt = getTGT(credentials);
         if (tgt != null) {
             long refreshTime = getRefreshTime(tgt);
             long now = System.currentTimeMillis();
             if (now >= refreshTime) {
                 try {
-                    LOG.info("Renewing TGT for "+tgt.getClient());
+                    LOG.info("Renewing TGT for " + tgt.getClient());
                     tgt.refresh();
                     saveTGT(tgt, credentials);
                 } catch (RefreshFailedException e) {
@@ -272,10 +270,10 @@ public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
         Map conf = new java.util.HashMap();
         conf.put("java.security.auth.login.config", args[0]);
         at.prepare(conf);
-        Map<String,String> creds = new java.util.HashMap<String,String>();
+        Map<String, String> creds = new java.util.HashMap<String, String>();
         at.populateCredentials(creds);
         Subject s = new Subject();
         at.populateSubject(s, creds);
-        LOG.info("Got a Subject "+s);
+        LOG.info("Got a Subject " + s);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
index 807abe3..647e240 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModule.java
@@ -29,7 +29,6 @@ import javax.security.auth.kerberos.KerberosTicket;
 import javax.security.auth.login.LoginException;
 import javax.security.auth.spi.LoginModule;
 
-
 /**
  * Custom LoginModule to enable Auto Login based on cached ticket
  */
@@ -41,10 +40,7 @@ public class AutoTGTKrb5LoginModule implements LoginModule {
 
     protected KerberosTicket kerbTicket = null;
 
-    public void initialize(Subject subject,
-                           CallbackHandler callbackHandler,
-                           Map<String, ?> sharedState,
-                           Map<String, ?> options) {
+    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
 
         this.subject = subject;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
index ba34fc9..6188566 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/AutoTGTKrb5LoginModuleTest.java
@@ -31,7 +31,7 @@ public class AutoTGTKrb5LoginModuleTest extends AutoTGTKrb5LoginModule {
     public void setKerbTicket(KerberosTicket ticket) {
         this.kerbTicket = ticket;
     }
-    
+
     @Override
     protected void getKerbTicketFromCache() {
         // Do nothing.

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
index d46aa8b..13a2cba 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ClientCallbackHandler.java
@@ -49,11 +49,11 @@ public class ClientCallbackHandler implements CallbackHandler {
      * @throws IOException
      */
     public ClientCallbackHandler(Configuration configuration) throws IOException {
-        if (configuration == null) return;
+        if (configuration == null)
+            return;
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_CLIENT);
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_CLIENT
-                    + "' entry in this configuration: Client cannot start.";
+            String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_CLIENT + "' entry in this configuration: Client cannot start.";
             LOG.error(errorMessage);
             throw new IOException(errorMessage);
         }
@@ -61,7 +61,8 @@ public class ClientCallbackHandler implements CallbackHandler {
 
     /**
      * This method is invoked by SASL for authentication challenges
-     * @param callbacks a collection of challenge callbacks 
+     * 
+     * @param callbacks a collection of challenge callbacks
      */
     public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
         for (Callback c : callbacks) {
@@ -69,20 +70,18 @@ public class ClientCallbackHandler implements CallbackHandler {
                 LOG.debug("name callback");
             } else if (c instanceof PasswordCallback) {
                 LOG.debug("password callback");
-                LOG.warn("Could not login: the client is being asked for a password, but the " +
-                        " client code does not currently support obtaining a password from the user." +
-                        " Make sure that the client is configured to use a ticket cache (using" +
-                        " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If" +
-                        " you still get this message after that, the TGT in the ticket cache has expired and must" +
-                        " be manually refreshed. To do so, first determine if you are using a password or a" +
-                        " keytab. If the former, run kinit in a Unix shell in the environment of the user who" +
-                        " is running this client using the command" +
-                        " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." +
-                        " If the latter, do" +
-                        " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and" +
-                        " <keytab> is the location of the keytab file). After manually refreshing your cache," +
-                        " restart this client. If you continue to see this message after manually refreshing" +
-                        " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
+                LOG.warn("Could not login: the client is being asked for a password, but the "
+                        + " client code does not currently support obtaining a password from the user."
+                        + " Make sure that the client is configured to use a ticket cache (using"
+                        + " the JAAS configuration setting 'useTicketCache=true)' and restart the client. If"
+                        + " you still get this message after that, the TGT in the ticket cache has expired and must"
+                        + " be manually refreshed. To do so, first determine if you are using a password or a"
+                        + " keytab. If the former, run kinit in a Unix shell in the environment of the user who" + " is running this client using the command"
+                        + " 'kinit <princ>' (where <princ> is the name of the client's Kerberos principal)." + " If the latter, do"
+                        + " 'kinit -k -t <keytab> <princ>' (where <princ> is the name of the Kerberos principal, and"
+                        + " <keytab> is the location of the keytab file). After manually refreshing your cache,"
+                        + " restart this client. If you continue to see this message after manually refreshing"
+                        + " your cache, ensure that your KDC host's clock is in sync with this host's clock.");
             } else if (c instanceof AuthorizeCallback) {
                 LOG.debug("authorization callback");
                 AuthorizeCallback ac = (AuthorizeCallback) c;
@@ -96,7 +95,7 @@ public class ClientCallbackHandler implements CallbackHandler {
                 if (ac.isAuthorized()) {
                     ac.setAuthorizedID(authzid);
                 }
-            }  else {
+            } else {
                 throw new UnsupportedCallbackException(c);
             }
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
index ecb0daf..e257a8a 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/KerberosSaslTransportPlugin.java
@@ -48,19 +48,19 @@ import backtype.storm.security.auth.AuthUtils;
 import backtype.storm.security.auth.SaslTransportPlugin;
 
 public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
-    public static final String KERBEROS = "GSSAPI"; 
+    public static final String KERBEROS = "GSSAPI";
     private static final Logger LOG = LoggerFactory.getLogger(KerberosSaslTransportPlugin.class);
 
     public TTransportFactory getServerTransportFactory() throws IOException {
-        //create an authentication callback handler
+        // create an authentication callback handler
         CallbackHandler server_callback_handler = new ServerCallbackHandler(login_conf, storm_conf);
-        
-        //login our principal
+
+        // login our principal
         Subject subject = null;
         try {
-            //specify a configuration object to be used
-            Configuration.setConfiguration(login_conf); 
-            //now login
+            // specify a configuration object to be used
+            Configuration.setConfiguration(login_conf);
+            // now login
             Login login = new Login(AuthUtils.LOGIN_CONTEXT_SERVER, server_callback_handler);
             subject = login.getSubject();
         } catch (LoginException ex) {
@@ -68,27 +68,27 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
             throw new RuntimeException(ex);
         }
 
-        //check the credential of our principal
-        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
-            throw new RuntimeException("Fail to verify user principal with section \""
-                    +AuthUtils.LOGIN_CONTEXT_SERVER+"\" in login configuration file "+ login_conf);
+        // check the credential of our principal
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+            throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_SERVER + "\" in login configuration file "
+                    + login_conf);
         }
 
-        String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal"); 
-        LOG.debug("principal:"+principal);  
+        String principal = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_SERVER, "principal");
+        LOG.debug("principal:" + principal);
         KerberosName serviceKerberosName = new KerberosName(principal);
         String serviceName = serviceKerberosName.getServiceName();
         String hostName = serviceKerberosName.getHostName();
-        Map<String, String> props = new TreeMap<String,String>();
+        Map<String, String> props = new TreeMap<String, String>();
         props.put(Sasl.QOP, "auth");
         props.put(Sasl.SERVER_AUTH, "false");
 
-        //create a transport factory that will invoke our auth callback for digest
+        // create a transport factory that will invoke our auth callback for digest
         TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
         factory.addServerDefinition(KERBEROS, serviceName, hostName, props, server_callback_handler);
 
-        //create a wrap transport factory so that we could apply user credential during connections
-        TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject); 
+        // create a wrap transport factory so that we could apply user credential during connections
+        TUGIAssumingTransportFactory wrapFactory = new TUGIAssumingTransportFactory(factory, subject);
 
         LOG.info("SASL GSSAPI transport factory will be used");
         return wrapFactory;
@@ -96,55 +96,47 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
 
     @Override
     public TTransport connect(TTransport transport, String serverHost, String asUser) throws TTransportException, IOException {
-        //create an authentication callback handler
+        // create an authentication callback handler
         ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
-        
-        //login our user
+
+        // login our user
         Login login = null;
-        try { 
-            //specify a configuration object to be used
-            Configuration.setConfiguration(login_conf); 
-            //now login
-            login  = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
+        try {
+            // specify a configuration object to be used
+            Configuration.setConfiguration(login_conf);
+            // now login
+            login = new Login(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
         } catch (LoginException ex) {
             LOG.error("Server failed to login in principal:" + ex, ex);
             throw new RuntimeException(ex);
         }
 
         final Subject subject = login.getSubject();
-        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { //error
-            throw new RuntimeException("Fail to verify user principal with section \""
-                        +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { // error
+            throw new RuntimeException("Fail to verify user principal with section \"" + AuthUtils.LOGIN_CONTEXT_CLIENT + "\" in login configuration file "
+                    + login_conf);
         }
 
         final String principal = StringUtils.isBlank(asUser) ? getPrincipal(subject) : asUser;
         String serviceName = AuthUtils.get(login_conf, AuthUtils.LOGIN_CONTEXT_CLIENT, "serviceName");
         if (serviceName == null) {
-            serviceName = AuthUtils.SERVICE; 
+            serviceName = AuthUtils.SERVICE;
         }
-        Map<String, String> props = new TreeMap<String,String>();
+        Map<String, String> props = new TreeMap<String, String>();
         props.put(Sasl.QOP, "auth");
         props.put(Sasl.SERVER_AUTH, "false");
 
         LOG.debug("SASL GSSAPI client transport is being established");
-        final TTransport sasalTransport = new TSaslClientTransport(KERBEROS, 
-                principal, 
-                serviceName, 
-                serverHost,
-                props,
-                null, 
-                transport);
-
-        //open Sasl transport with the login credential
+        final TTransport sasalTransport = new TSaslClientTransport(KERBEROS, principal, serviceName, serverHost, props, null, transport);
+
+        // open Sasl transport with the login credential
         try {
-            Subject.doAs(subject,
-                    new PrivilegedExceptionAction<Void>() {
+            Subject.doAs(subject, new PrivilegedExceptionAction<Void>() {
                 public Void run() {
                     try {
-                        LOG.debug("do as:"+ principal);
+                        LOG.debug("do as:" + principal);
                         sasalTransport.open();
-                    }
-                    catch (Exception e) {
+                    } catch (Exception e) {
                         LOG.error("Client failed to open SaslClientTransport to interact with a server during session initiation: " + e, e);
                     }
                     return null;
@@ -158,19 +150,18 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
     }
 
     private String getPrincipal(Subject subject) {
-        Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
-        if (principals==null || principals.size()<1) {
+        Set<Principal> principals = (Set<Principal>) subject.getPrincipals();
+        if (principals == null || principals.size() < 1) {
             LOG.info("No principal found in login subject");
             return null;
         }
-        return ((Principal)(principals.toArray()[0])).getName();
+        return ((Principal) (principals.toArray()[0])).getName();
     }
 
-    /** A TransportFactory that wraps another one, but assumes a specified UGI
-     * before calling through.                                                                                                                                                      
-     *                                                                                                                                                                              
-     * This is used on the server side to assume the server's Principal when accepting                                                                                              
-     * clients.                                                                                                                                                                     
+    /**
+     * A TransportFactory that wraps another one, but assumes a specified UGI before calling through.
+     * 
+     * This is used on the server side to assume the server's Principal when accepting clients.
      */
     static class TUGIAssumingTransportFactory extends TTransportFactory {
         private final Subject subject;
@@ -180,21 +171,19 @@ public class KerberosSaslTransportPlugin extends SaslTransportPlugin {
             this.wrapped = wrapped;
             this.subject = subject;
 
-            Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
-            if (principals.size()>0) 
-                LOG.info("Service principal:"+ ((Principal)(principals.toArray()[0])).getName());
+            Set<Principal> principals = (Set<Principal>) subject.getPrincipals();
+            if (principals.size() > 0)
+                LOG.info("Service principal:" + ((Principal) (principals.toArray()[0])).getName());
         }
 
         @Override
         public TTransport getTransport(final TTransport trans) {
             try {
-                return Subject.doAs(subject,
-                        new PrivilegedExceptionAction<TTransport>() {
+                return Subject.doAs(subject, new PrivilegedExceptionAction<TTransport>() {
                     public TTransport run() {
                         try {
                             return wrapped.getTransport(trans);
-                        }
-                        catch (Exception e) {
+                        } catch (Exception e) {
                             LOG.error("Storm server failed to open transport to interact with a client during session initiation: " + e, e);
                             return null;
                         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
index 7b143f0..0e32e0b 100755
--- a/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/auth/kerberos/ServerCallbackHandler.java
@@ -41,11 +41,12 @@ public class ServerCallbackHandler implements CallbackHandler {
     private String userName;
 
     public ServerCallbackHandler(Configuration configuration, Map stormConf) throws IOException {
-        if (configuration==null) return;
+        if (configuration == null)
+            return;
 
         AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(AuthUtils.LOGIN_CONTEXT_SERVER);
         if (configurationEntries == null) {
-            String errorMessage = "Could not find a '"+AuthUtils.LOGIN_CONTEXT_SERVER+"' entry in this configuration: Server cannot start.";
+            String errorMessage = "Could not find a '" + AuthUtils.LOGIN_CONTEXT_SERVER + "' entry in this configuration: Server cannot start.";
             LOG.error(errorMessage);
             throw new IOException(errorMessage);
         }
@@ -78,14 +79,14 @@ public class ServerCallbackHandler implements CallbackHandler {
         String authenticationID = ac.getAuthenticationID();
         LOG.info("Successfully authenticated client: authenticationID=" + authenticationID + " authorizationID= " + ac.getAuthorizationID());
 
-        //if authorizationId is not set, set it to authenticationId.
-        if(ac.getAuthorizationID() == null) {
+        // if authorizationId is not set, set it to authenticationId.
+        if (ac.getAuthorizationID() == null) {
             ac.setAuthorizedID(authenticationID);
         }
 
-        //When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
-        //add the authNid as the real user in reqContext's subject which will be used during authorization.
-        if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+        // When authNid and authZid are not equal , authNId is attempting to impersonate authZid, We
+        // add the authNid as the real user in reqContext's subject which will be used during authorization.
+        if (!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
             ReqContext.context().setRealPrincipal(new SaslTransportPlugin.User(ac.getAuthenticationID()));
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
index 8e66cdf..437cdbb 100644
--- a/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/security/serialization/BlowfishTupleSerializer.java
@@ -40,8 +40,9 @@ import backtype.storm.Config;
  */
 public class BlowfishTupleSerializer extends Serializer<ListDelegate> {
     /**
-     * The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory). 
-     * You should use in via "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ...".
+     * The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory). You should use in via
+     * "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ..."
+     * .
      */
     public static String SECRET_KEY = "topology.tuple.serializer.blowfish.key";
     private static final Logger LOG = LoggerFactory.getLogger(BlowfishTupleSerializer.class);
@@ -50,12 +51,12 @@ public class BlowfishTupleSerializer extends Serializer<ListDelegate> {
     public BlowfishTupleSerializer(Kryo kryo, Map storm_conf) {
         String encryption_key = null;
         try {
-            encryption_key = (String)storm_conf.get(SECRET_KEY);
+            encryption_key = (String) storm_conf.get(SECRET_KEY);
             LOG.debug("Blowfish serializer being constructed ...");
             if (encryption_key == null) {
                 throw new RuntimeException("Blowfish encryption key not specified");
             }
-            byte[] bytes =  Hex.decodeHex(encryption_key.toCharArray());
+            byte[] bytes = Hex.decodeHex(encryption_key.toCharArray());
             _serializer = new BlowfishSerializer(new ListDelegateSerializer(), bytes);
         } catch (org.apache.commons.codec.DecoderException ex) {
             throw new RuntimeException("Blowfish encryption key invalid", ex);
@@ -69,22 +70,23 @@ public class BlowfishTupleSerializer extends Serializer<ListDelegate> {
 
     @Override
     public ListDelegate read(Kryo kryo, Input input, Class<ListDelegate> type) {
-        return (ListDelegate)_serializer.read(kryo, input, type);
+        return (ListDelegate) _serializer.read(kryo, input, type);
     }
 
     /**
      * Produce a blowfish key to be used in "Storm jar" command
      */
     public static void main(String[] args) {
-        try{
+        try {
             KeyGenerator kgen = KeyGenerator.getInstance("Blowfish");
             SecretKey skey = kgen.generateKey();
             byte[] raw = skey.getEncoded();
             String keyString = new String(Hex.encodeHex(raw));
-            System.out.println("storm -c "+SECRET_KEY+"="+keyString+" -c "+Config.TOPOLOGY_TUPLE_SERIALIZER+"="+BlowfishTupleSerializer.class.getName() + " ..." );
+            System.out.println("storm -c " + SECRET_KEY + "=" + keyString + " -c " + Config.TOPOLOGY_TUPLE_SERIALIZER + "="
+                    + BlowfishTupleSerializer.class.getName() + " ...");
         } catch (Exception ex) {
             LOG.error(ex.getMessage());
             ex.printStackTrace();
         }
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
index a055eb2..91e629a 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultKryoFactory.java
@@ -22,30 +22,29 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import java.util.Map;
 
-
 public class DefaultKryoFactory implements IKryoFactory {
 
     public static class KryoSerializableDefault extends Kryo {
         boolean _override = false;
-        
+
         public void overrideDefault(boolean value) {
             _override = value;
-        }                
-        
+        }
+
         @Override
         public Serializer getDefaultSerializer(Class type) {
-            if(_override) {
+            if (_override) {
                 return new SerializableSerializer();
             } else {
                 return super.getDefaultSerializer(type);
             }
-        }        
-    }    
-    
+        }
+    }
+
     @Override
     public Kryo getKryo(Map conf) {
         KryoSerializableDefault k = new KryoSerializableDefault();
-        k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)));        
+        k.setRegistrationRequired((Boolean) conf.get(Config.TOPOLOGY_KRYO_REGISTER_REQUIRED));
         k.setReferences(false);
         return k;
     }
@@ -53,12 +52,12 @@ public class DefaultKryoFactory implements IKryoFactory {
     @Override
     public void preRegister(Kryo k, Map conf) {
     }
-    
+
     public void postRegister(Kryo k, Map conf) {
-        ((KryoSerializableDefault)k).overrideDefault(true);
+        ((KryoSerializableDefault) k).overrideDefault((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION));
     }
 
     @Override
-    public void postDecorate(Kryo k, Map conf) {        
-    }    
+    public void postDecorate(Kryo k, Map conf) {
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
index 6d986af..c97470f 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/DefaultSerializationDelegate.java
@@ -48,10 +48,10 @@ public class DefaultSerializationDelegate implements SerializationDelegate {
             ObjectInputStream ois = new ObjectInputStream(bis);
             Object ret = ois.readObject();
             ois.close();
-            return (T)ret;
-        } catch(IOException ioe) {
+            return (T) ret;
+        } catch (IOException ioe) {
             throw new RuntimeException(ioe);
-        } catch(ClassNotFoundException e) {
+        } catch (ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
index c8377c3..4b7951e 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeSerializationDelegate.java
@@ -22,8 +22,8 @@ import java.util.zip.GZIPInputStream;
 
 /**
  * Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes with gzip. If not, uses
- * {@link backtype.storm.serialization.DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled
- * via {@link #prepare(java.util.Map)} is passed through to both delegates.
+ * {@link DefaultSerializationDelegate} to deserialize. Any logic needing to be enabled via {@link #prepare(Map)} is
+ * passed through to both delegates.
  */
 @Deprecated
 public class GzipBridgeSerializationDelegate implements SerializationDelegate {
@@ -47,7 +47,7 @@ public class GzipBridgeSerializationDelegate implements SerializationDelegate {
         if (isGzipped(bytes)) {
             return gzipDelegate.deserialize(bytes, clazz);
         } else {
-            return defaultDelegate.deserialize(bytes,clazz);
+            return defaultDelegate.deserialize(bytes, clazz);
         }
     }
 
@@ -59,7 +59,6 @@ public class GzipBridgeSerializationDelegate implements SerializationDelegate {
      * Looks ahead to see if the GZIP magic constant is heading {@code bytes}
      */
     private boolean isGzipped(byte[] bytes) {
-        return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE)
-               && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
+        return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
index e5e77c3..6d580db 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipBridgeThriftSerializationDelegate.java
@@ -22,8 +22,8 @@ import java.util.zip.GZIPInputStream;
 
 /**
  * Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes with gzip. If not, uses
- * {@link backtype.storm.serialization.ThriftSerializationDelegate} to deserialize. Any logic needing to be enabled
- * via {@link #prepare(java.util.Map)} is passed through to both delegates.
+ * {@link ThriftSerializationDelegate} to deserialize. Any logic needing to be enabled via {@link #prepare(Map)} is
+ * passed through to both delegates.
  */
 public class GzipBridgeThriftSerializationDelegate implements SerializationDelegate {
 
@@ -46,7 +46,7 @@ public class GzipBridgeThriftSerializationDelegate implements SerializationDeleg
         if (isGzipped(bytes)) {
             return gzipDelegate.deserialize(bytes, clazz);
         } else {
-            return defaultDelegate.deserialize(bytes,clazz);
+            return defaultDelegate.deserialize(bytes, clazz);
         }
     }
 
@@ -58,7 +58,6 @@ public class GzipBridgeThriftSerializationDelegate implements SerializationDeleg
      * Looks ahead to see if the GZIP magic constant is heading {@code bytes}
      */
     private boolean isGzipped(byte[] bytes) {
-        return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE)
-               && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
+        return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE) && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
index 3c8ee8b..2b27af0 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipSerializationDelegate.java
@@ -54,10 +54,10 @@ public class GzipSerializationDelegate implements SerializationDelegate {
             ObjectInputStream ois = new ObjectInputStream(gis);
             Object ret = ois.readObject();
             ois.close();
-            return (T)ret;
-        } catch(IOException ioe) {
+            return (T) ret;
+        } catch (IOException ioe) {
             throw new RuntimeException(ioe);
-        } catch(ClassNotFoundException e) {
+        } catch (ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
index 933a125..a76c080 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/GzipThriftSerializationDelegate.java
@@ -49,7 +49,7 @@ public class GzipThriftSerializationDelegate implements SerializationDelegate {
         try {
             TBase instance = (TBase) clazz.newInstance();
             new TDeserializer().deserialize(instance, Utils.gunzip(bytes));
-            return (T)instance;
+            return (T) instance;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
index b154a36..36e59a5 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoDecorator.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package backtype.storm.serialization;
+
 import com.esotericsoftware.kryo.Kryo;
 
 public interface IKryoDecorator {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
index 60a847d..b5c4522 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/IKryoFactory.java
@@ -21,20 +21,18 @@ import com.esotericsoftware.kryo.Kryo;
 import java.util.Map;
 
 /**
- * An interface that controls the Kryo instance used by Storm for serialization.
- * The lifecycle is:
+ * An interface that controls the Kryo instance used by Storm for serialization. The lifecycle is:
  * 
- * 1. The Kryo instance is constructed using getKryo
- * 2. Storm registers the default classes (e.g. arrays, lists, maps, etc.)
- * 3. Storm calls preRegister hook
- * 4. Storm registers all user-defined registrations through topology.kryo.register
- * 5. Storm calls postRegister hook
- * 6. Storm calls all user-defined decorators through topology.kryo.decorators
- * 7. Storm calls postDecorate hook
+ * 1. The Kryo instance is constructed using getKryo 2. Storm registers the default classes (e.g. arrays, lists, maps, etc.) 3. Storm calls preRegister hook 4.
+ * Storm registers all user-defined registrations through topology.kryo.register 5. Storm calls postRegister hook 6. Storm calls all user-defined decorators
+ * through topology.kryo.decorators 7. Storm calls postDecorate hook
  */
 public interface IKryoFactory {
     Kryo getKryo(Map conf);
+
     void preRegister(Kryo k, Map conf);
+
     void postRegister(Kryo k, Map conf);
+
     void postDecorate(Kryo k, Map conf);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
index 4e68658..641a472 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleDeserializer.java
@@ -21,5 +21,5 @@ import backtype.storm.tuple.Tuple;
 import java.io.IOException;
 
 public interface ITupleDeserializer {
-    Tuple deserialize(byte[] ser);        
+    Tuple deserialize(byte[] ser);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
index 90ad932..68df8bf 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/ITupleSerializer.java
@@ -19,8 +19,7 @@ package backtype.storm.serialization;
 
 import backtype.storm.tuple.Tuple;
 
-
 public interface ITupleSerializer {
     byte[] serialize(Tuple tuple);
-//    long crc32(Tuple tuple);
+    // long crc32(Tuple tuple);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
index 3496e68..bb8bcb4 100644
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
@@ -40,55 +40,65 @@ import java.util.Map;
 
 public class KryoTupleDeserializer implements ITupleDeserializer {
     private static final Logger LOG = LoggerFactory.getLogger(KryoTupleDeserializer.class);
-    
+
     public static final boolean USE_RAW_PACKET = true;
-    
+
     GeneralTopologyContext _context;
     KryoValuesDeserializer _kryo;
     SerializationFactory.IdDictionary _ids;
     Input _kryoInput;
-    
+
     public KryoTupleDeserializer(final Map conf, final GeneralTopologyContext context) {
         _kryo = new KryoValuesDeserializer(conf);
         _context = context;
         _ids = new SerializationFactory.IdDictionary(context.getRawTopology());
         _kryoInput = new Input(1);
     }
-    
+
     public Tuple deserialize(byte[] ser) {
-        
+        _kryoInput.setBuffer(ser);
+        return deserialize(_kryoInput);
+    }
+
+    public Tuple deserialize(byte[] ser, int offset, int count) {
+        _kryoInput.setBuffer(ser, offset, count);
+        return deserialize(_kryoInput);
+    }
+
+    public Tuple deserialize(Input input) {
         int targetTaskId = 0;
+        long timeStamp = 0l;
         int taskId = 0;
         int streamId = 0;
         String componentName = null;
         String streamName = null;
         MessageId id = null;
-        
+
         try {
-            
-            _kryoInput.setBuffer(ser);
-            
-            targetTaskId = _kryoInput.readInt();
-            taskId = _kryoInput.readInt(true);
-            streamId = _kryoInput.readInt(true);
+            targetTaskId = input.readInt();
+            timeStamp = input.readLong();
+            taskId = input.readInt(true);
+            streamId = input.readInt(true);
             componentName = _context.getComponentId(taskId);
             streamName = _ids.getStreamName(componentName, streamId);
-            id = MessageId.deserialize(_kryoInput);
-            List<Object> values = _kryo.deserializeFrom(_kryoInput);
+            id = MessageId.deserialize(input);
+            List<Object> values = _kryo.deserializeFrom(input);
             TupleImplExt tuple = new TupleImplExt(_context, values, taskId, streamName, id);
             tuple.setTargetTaskId(targetTaskId);
+            tuple.setCreationTimeStamp(timeStamp);
             return tuple;
         } catch (Throwable e) {
             StringBuilder sb = new StringBuilder();
-            
+
             sb.append("Deserialize error:");
             sb.append("targetTaskId:").append(targetTaskId);
+            sb.append(",creationTimeStamp:").append(timeStamp);
             sb.append(",taskId:").append(taskId);
             sb.append(",streamId:").append(streamId);
             sb.append(",componentName:").append(componentName);
             sb.append(",streamName:").append(streamName);
             sb.append(",MessageId").append(id);
-            
+
             LOG.info(sb.toString(), e);
             throw new RuntimeException(e);
         }
@@ -99,15 +109,14 @@ public class KryoTupleDeserializer implements ITupleDeserializer {
         
         int offset = 0;
         while(offset < ser.length) {
-            int tupleSize = Utils.readIntFromByteArray(ser, offset);
+            _kryoInput.setBuffer(ser, offset, offset + 4);
+            int tupleSize = _kryoInput.readInt();
             offset += 4;
 
-            ByteBuffer buff = ByteBuffer.allocate(tupleSize);
-            buff.put(ser, offset, tupleSize);
-            ret.addToBatch(deserialize(buff.array()));
+            ret.addToBatch(deserialize(ser, offset, offset + tupleSize));
             offset += tupleSize;
         }
-        
+
         return ret;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
index 1c53d5d..e49e58b 100644
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
@@ -33,30 +33,35 @@ public class KryoTupleSerializer implements ITupleSerializer {
     KryoValuesSerializer _kryo;
     SerializationFactory.IdDictionary _ids;
     Output _kryoOut;
-    
+
     public KryoTupleSerializer(final Map conf, final GeneralTopologyContext context) {
         _kryo = new KryoValuesSerializer(conf);
         _kryoOut = new Output(2000, 2000000000);
         _ids = new SerializationFactory.IdDictionary(context.getRawTopology());
     }
-    
+
+    public byte[] serialize(Tuple tuple) {
+        _kryoOut.clear();
+        serializeTuple(_kryoOut, tuple);
+        return _kryoOut.toBytes();
+    }
     /**
      * @@@ in the furture, it will skill serialize 'targetTask' through check some flag
-     * @see backtype.storm.serialization.ITupleSerializer#serialize(int, backtype.storm.tuple.Tuple)
+     * @see ITupleSerializer#serialize(int, Tuple)
      */
-    public byte[] serialize(Tuple tuple) {
+    private void serializeTuple(Output output, Tuple tuple) {
         try {
-            
-            _kryoOut.clear();
             if (tuple instanceof TupleExt) {
-                _kryoOut.writeInt(((TupleExt) tuple).getTargetTaskId());
+                output.writeInt(((TupleExt) tuple).getTargetTaskId());
+                output.writeLong(((TupleExt) tuple).getCreationTimeStamp());
             }
-            
-            _kryoOut.writeInt(tuple.getSourceTask(), true);
-            _kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
-            tuple.getMessageId().serialize(_kryoOut);
-            _kryo.serializeInto(tuple.getValues(), _kryoOut);
-            return _kryoOut.toBytes();
+
+            output.writeInt(tuple.getSourceTask(), true);
+            output.writeInt(
+                    _ids.getStreamId(tuple.getSourceComponent(),
+                            tuple.getSourceStreamId()), true);
+            tuple.getMessageId().serialize(output);
+            _kryo.serializeInto(tuple.getValues(), output);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -66,31 +71,28 @@ public class KryoTupleSerializer implements ITupleSerializer {
         if (batch == null || batch.currBatchSize() == 0)
             return null;
 
-        byte[][] bytes = new byte[batch.currBatchSize()][];
-        int i = 0, len = 0;
+        _kryoOut.clear();
         for (Tuple tuple : batch.getTuples()) {
-            /* byte structure: 
+            /* 
+             * byte structure: 
              * 1st tuple: length + tuple bytes
              * 2nd tuple: length + tuple bytes
              * ......
              */
-            bytes[i] = serialize(tuple);
-            len += bytes[i].length;
-            // add length bytes (int)
-            len += 4;
-            i++;
-        }
-
-        byte[] ret = new byte[len];
-        int index = 0;
-        for (i = 0; i < bytes.length; i++) {
-            Utils.writeIntToByteArray(ret, index, bytes[i].length);
-            index += 4;
-            for (int j = 0; j < bytes[i].length; j++) {
-                ret[index++] = bytes[i][j];
-            }
+            int startPos = _kryoOut.position();
+            
+            // Set initial value of tuple length, which will be updated accordingly after serialization
+            _kryoOut.writeInt(0);
+            
+            serializeTuple(_kryoOut, tuple);
+            
+            // Update the tuple length
+            int endPos = _kryoOut.position();
+            _kryoOut.setPosition(startPos);
+            _kryoOut.writeInt(endPos - startPos - 4);
+            _kryoOut.setPosition(endPos);
         }
-        return ret;
+        return _kryoOut.toBytes();
     }
 
     public static byte[] serialize(int targetTask) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
index 209ae53..45a7376 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
@@ -28,22 +28,22 @@ import java.util.Map;
 public class KryoValuesDeserializer {
     Kryo _kryo;
     Input _kryoInput;
-    
+
     public KryoValuesDeserializer(Map conf) {
         _kryo = SerializationFactory.getKryo(conf);
         _kryoInput = new Input(1);
     }
-    
+
     public List<Object> deserializeFrom(Input input) {
-    	ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
-   	return delegate.getDelegate();
+        ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
+        return delegate.getDelegate();
     }
-    
+
     public List<Object> deserialize(byte[] ser) throws IOException {
         _kryoInput.setBuffer(ser);
         return deserializeFrom(_kryoInput);
     }
-    
+
     public Object deserializeObject(byte[] ser) throws IOException {
         _kryoInput.setBuffer(ser);
         return _kryo.readClassAndObject(_kryoInput);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
index c4a2f71..d53f1bd 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
@@ -28,28 +28,28 @@ public class KryoValuesSerializer {
     Kryo _kryo;
     ListDelegate _delegate;
     Output _kryoOut;
-    
+
     public KryoValuesSerializer(Map conf) {
         _kryo = SerializationFactory.getKryo(conf);
         _delegate = new ListDelegate();
         _kryoOut = new Output(2000, 2000000000);
     }
-    
+
     public void serializeInto(List<Object> values, Output out) throws IOException {
         // this ensures that list of values is always written the same way, regardless
-        // of whether it's a java collection or one of clojure's persistent collections 
+        // of whether it's a java collection or one of clojure's persistent collections
         // (which have different serializers)
         // Doing this lets us deserialize as ArrayList and avoid writing the class here
         _delegate.setDelegate(values);
-        _kryo.writeObject(out, _delegate); 
+        _kryo.writeObject(out, _delegate);
     }
-    
+
     public byte[] serialize(List<Object> values) throws IOException {
         _kryoOut.clear();
         serializeInto(values, _kryoOut);
         return _kryoOut.toBytes();
     }
-    
+
     public byte[] serializeObject(Object obj) {
         _kryoOut.clear();
         _kryo.writeClassAndObject(_kryoOut, obj);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
index 376ad2a..b60e8b8 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/SerializableSerializer.java
@@ -30,7 +30,7 @@ import java.io.ObjectOutputStream;
 import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 
 public class SerializableSerializer extends Serializer<Object> {
-    
+
     @Override
     public void write(Kryo kryo, Output output, Object object) {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -45,7 +45,7 @@ public class SerializableSerializer extends Serializer<Object> {
         output.writeInt(ser.length);
         output.writeBytes(ser);
     }
-    
+
     @Override
     public Object read(Kryo kryo, Input input, Class c) {
         int len = input.readInt();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java b/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
index ef859be..ebf6158 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/SerializationFactory.java
@@ -21,7 +21,6 @@ import backtype.storm.Config;
 import backtype.storm.generated.ComponentCommon;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.serialization.types.ArrayListSerializer;
-import backtype.storm.serialization.types.ListDelegateSerializer;
 import backtype.storm.serialization.types.HashMapSerializer;
 import backtype.storm.serialization.types.HashSetSerializer;
 import backtype.storm.transactional.TransactionAttempt;
@@ -33,27 +32,22 @@ import carbonite.JavaBridge;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.serializers.DefaultSerializers.BigIntegerSerializer;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.math.BigInteger;
+import java.util.*;
+
 public class SerializationFactory {
     public static final Logger LOG = LoggerFactory.getLogger(SerializationFactory.class);
-    
+
     public static Kryo getKryo(Map conf) {
         IKryoFactory kryoFactory = (IKryoFactory) Utils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
         Kryo k = kryoFactory.getKryo(conf);
         if (WorkerClassLoader.getInstance() != null)
             k.setClassLoader(WorkerClassLoader.getInstance());
         k.register(byte[].class);
-        
+
         /* tuple payload serializer is specified via configuration */
         String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
         try {
@@ -63,7 +57,7 @@ public class SerializationFactory {
         } catch (ClassNotFoundException ex) {
             throw new RuntimeException(ex);
         }
-        
+
         k.register(ArrayList.class, new ArrayListSerializer());
         k.register(HashMap.class, new HashMapSerializer());
         k.register(HashSet.class, new HashSetSerializer());
@@ -78,17 +72,17 @@ public class SerializationFactory {
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        
+
         Map<String, String> registrations = normalizeKryoRegister(conf);
-        
+
         kryoFactory.preRegister(k, conf);
-        
+
         boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);
         for (String klassName : registrations.keySet()) {
             String serializerClassName = registrations.get(klassName);
             try {
                 Class klass = Class.forName(klassName, true, k.getClassLoader());
-                
+
                 Class serializerClass = null;
                 if (serializerClassName != null)
                     serializerClass = Class.forName(serializerClassName, true, k.getClassLoader());
@@ -105,9 +99,9 @@ public class SerializationFactory {
                 }
             }
         }
-        
+
         kryoFactory.postRegister(k, conf);
-        
+
         if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
             for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
                 try {
@@ -127,21 +121,21 @@ public class SerializationFactory {
                 }
             }
         }
-        
+
         kryoFactory.postDecorate(k, conf);
-        
+
         return k;
     }
-    
+
     public static class IdDictionary {
         Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>();
         Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>();
-        
+
         public IdDictionary(StormTopology topology) {
             List<String> componentNames = new ArrayList<String>(topology.get_spouts().keySet());
             componentNames.addAll(topology.get_bolts().keySet());
             componentNames.addAll(topology.get_state_spouts().keySet());
-            
+
             for (String name : componentNames) {
                 ComponentCommon common = Utils.getComponentCommon(topology, name);
                 List<String> streams = new ArrayList<String>(common.get_streams().keySet());
@@ -149,15 +143,15 @@ public class SerializationFactory {
                 streamIdToName.put(name, Utils.reverseMap(streamNametoId.get(name)));
             }
         }
-        
+
         public int getStreamId(String component, String stream) {
             return streamNametoId.get(component).get(stream);
         }
-        
+
         public String getStreamName(String component, int stream) {
             return streamIdToName.get(component).get(stream);
         }
-        
+
         private static Map<String, Integer> idify(List<String> names) {
             Collections.sort(names);
             Map<String, Integer> ret = new HashMap<String, Integer>();
@@ -169,7 +163,7 @@ public class SerializationFactory {
             return ret;
         }
     }
-    
+
     private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass, Map conf) {
         try {
             try {
@@ -201,7 +195,7 @@ public class SerializationFactory {
             throw new IllegalArgumentException("Unable to create serializer \"" + serializerClass.getName() + "\" for class: " + superClass.getName(), ex);
         }
     }
-    
+
     private static Map<String, String> normalizeKryoRegister(Map conf) {
         // TODO: de-duplicate this logic with the code in nimbus
         Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER);
@@ -219,7 +213,7 @@ public class SerializationFactory {
                 }
             }
         }
-        
+
         // ensure always same order for registrations with TreeMap
         return new TreeMap<String, String>(ret);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java b/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
index f5d03e4..ba37614 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/ThriftSerializationDelegate.java
@@ -33,7 +33,7 @@ public class ThriftSerializationDelegate implements SerializationDelegate {
     @Override
     public byte[] serialize(Object object) {
         try {
-            return  new TSerializer().serialize((TBase) object);
+            return new TSerializer().serialize((TBase) object);
         } catch (TException e) {
             throw new RuntimeException(e);
         }
@@ -44,7 +44,7 @@ public class ThriftSerializationDelegate implements SerializationDelegate {
         try {
             TBase instance = (TBase) clazz.newInstance();
             new TDeserializer().deserialize(instance, bytes);
-            return (T)instance;
+            return (T) instance;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
index 6b7e308..a4bac2f 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
@@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
 import java.util.ArrayList;
 import java.util.Collection;
 
-
 public class ArrayListSerializer extends CollectionSerializer {
     @Override
     public Collection create(Kryo kryo, Input input, Class<Collection> type) {
         return new ArrayList();
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
index 662211b..00af80d 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
@@ -23,7 +23,6 @@ import com.esotericsoftware.kryo.serializers.MapSerializer;
 import java.util.HashMap;
 import java.util.Map;
 
-
 public class HashMapSerializer extends MapSerializer {
     @Override
     public Map create(Kryo kryo, Input input, Class<Map> type) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
index 77fc353..eb3aab0 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
@@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
 import java.util.Collection;
 import java.util.HashSet;
 
-
 public class HashSetSerializer extends CollectionSerializer {
     @Override
     public Collection create(Kryo kryo, Input input, Class<Collection> type) {
         return new HashSet();
-    }       
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java b/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
index c71a19d..c65f16a 100755
--- a/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
+++ b/jstorm-core/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
@@ -23,10 +23,9 @@ import com.esotericsoftware.kryo.serializers.CollectionSerializer;
 import backtype.storm.utils.ListDelegate;
 import java.util.Collection;
 
-
 public class ListDelegateSerializer extends CollectionSerializer {
     @Override
     public Collection create(Kryo kryo, Input input, Class<Collection> type) {
         return new ListDelegate();
-    }    
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java b/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
index 5999fbb..9b837ba 100755
--- a/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
@@ -18,6 +18,7 @@
 package backtype.storm.spout;
 
 public interface IMultiSchemableSpout {
-  MultiScheme getScheme();
-  void setScheme(MultiScheme scheme);
+    MultiScheme getScheme();
+
+    void setScheme(MultiScheme scheme);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java b/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
index df455d9..7eca980 100755
--- a/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/spout/ISchemableSpout.java
@@ -17,8 +17,8 @@
  */
 package backtype.storm.spout;
 
-
 public interface ISchemableSpout {
-     Scheme getScheme();
-     void setScheme(Scheme scheme);
+    Scheme getScheme();
+
+    void setScheme(Scheme scheme);
 }