You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/27 04:55:29 UTC

svn commit: r468243 - in /geronimo/sandbox/gcache/server/src: main/java/org/apache/geronimo/gcache/transports/tcp/ test/java/org/apache/geronimo/gcache/transports/tcp/

Author: jgenender
Date: Thu Oct 26 19:55:28 2006
New Revision: 468243

URL: http://svn.apache.org/viewvc?view=rev&rev=468243
Log:
Get unit tests to pass

Modified:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java Thu Oct 26 19:55:28 2006
@@ -27,5 +27,6 @@
     public final static String AUTH_TASK = "AUTH_TASK";
     public final static String BULK_COUNT = "BULK_COUNT_";
     public final static String BULK_COMMAND_ID = "BULK_COMMAND_ID_";
+    public final static String MESSAGE_ACK_ID = "MESSAGE_ACK_ID_";
     public final static String REMOTE_PUBLIC_KEY = "REMOTE_PUBLIC_KEY";
 }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java Thu Oct 26 19:55:28 2006
@@ -24,11 +24,11 @@
 import org.apache.geronimo.gcache.CacheInfoHolder;
 import org.apache.geronimo.gcache.command.AuthCommand;
 import org.apache.geronimo.gcache.command.HandShakeCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
 import org.apache.geronimo.gcache.command.PublicKeyCommand;
 import org.apache.geronimo.gcache.util.CipherUtil;
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.NextFilter;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
 
@@ -39,8 +39,8 @@
 
     public final static String NAME = "AuthenticationFilter";
 
-
     private final String userId;
+
     private final String password;
 
     public TCPAuthenticationFilter(final String userId, final String password) {
@@ -49,7 +49,8 @@
     }
 
     @Override
-    public void sessionOpened(NextFilter nextFilter, IoSession sess) throws Exception {
+    public void sessionOpened(NextFilter nextFilter, IoSession sess)
+            throws Exception {
         //Start up the response timeout
         ScheduledFuture task = ((TCPSocketHandler) sess.getHandler()).schedule(
                 new TimeoutTask(sess), 5000);
@@ -70,7 +71,7 @@
 
     @Override
     public void messageReceived(NextFilter nextFilter, IoSession sess,
-                                Object obj) throws Exception {
+            Object obj) throws Exception {
 
         //If we have authenticated, continue on
         if (sess.containsAttribute(Constants.AUTHENTICATED)) {
@@ -83,12 +84,11 @@
         //If the session has a remote public key, then it's an Auth Command
         if (sess.containsAttribute(Constants.REMOTE_PUBLIC_KEY)) {
             handleAuth(sess, obj);
-            return;
+        } else {
+            //No Remote Public Key, so this should be a handshake
+            handleHandShake(sess, obj);
         }
 
-        //No Remote Public Key, so this should be a handshake
-        handleHandShake(sess, obj);
-
         //Consume the message (don't pass it on)
         nextFilter.messageReceived(sess, obj);
     }
@@ -96,12 +96,15 @@
     private void handleAuth(IoSession sess, Object obj) throws Exception {
         if (!(obj instanceof AuthCommand)) {
             //Nope...buh-bye...
-            log.error("Expected AuthCommand but got " + obj.getClass().getSimpleName() + " from " + sess.getRemoteAddress().toString());
+            log.error("Expected AuthCommand but got "
+                    + obj.getClass().getSimpleName() + " from "
+                    + sess.getRemoteAddress().toString());
             sess.close();
             return;
         }
 
-        ScheduledFuture authTask = (ScheduledFuture) sess.getAttribute(Constants.AUTH_TASK);
+        ScheduledFuture authTask = (ScheduledFuture) sess
+                .getAttribute(Constants.AUTH_TASK);
 
         //Cancel the timer
         if (!authTask.cancel(false)) {
@@ -116,7 +119,8 @@
             log.debug("User Id read was '" + authUserId + "'");
         }
         if (!userId.equals(authUserId)) {
-            log.error("Authentication failure for " + sess.getRemoteAddress().toString());
+            log.error("Authentication failure for "
+                    + sess.getRemoteAddress().toString());
             sess.close();
             return;
         }
@@ -126,7 +130,8 @@
             log.debug("Password read was '" + authPassword + "'");
         }
         if (!password.equals(authPassword)) {
-            log.error("Authentication failure for " + sess.getRemoteAddress().toString());
+            log.error("Authentication failure for "
+                    + sess.getRemoteAddress().toString());
             sess.close();
             return;
         }
@@ -140,15 +145,16 @@
         sess.setAttribute(Constants.AUTHENTICATED);
 
         //Now add the client to the cache to start receiving events
-        CacheInfoHolder infoHolder = ((TCPSocketHandler) sess.getHandler()).getInfoHolder();
+        CacheInfoHolder infoHolder = ((TCPSocketHandler) sess.getHandler())
+                .getInfoHolder();
         infoHolder.getEndpointManager().addEndpoint(new TCPEndpoint(sess));
 
         //See if we need to send an Ack
-        TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) sess
-                .getFilterChain().get(TCPMessageAckCommandFilter.NAME);
-        if (filter != null) {
-            long commandId = auth.getCommandId();
-            filter.requestAck(commandId, sess);
+        if (sess.getFilterChain().contains(TCPMessageAckCommandFilter.NAME)){
+            log.debug("Sending MessageAck");
+            MessageAckCommand ack = new MessageAckCommand();
+            ack.setMessageId(auth.getCommandId());
+            sess.write(ack);
         }
 
     }
@@ -158,7 +164,9 @@
                 .getAttribute(Constants.AUTH_TASK);
 
         if (!(obj instanceof HandShakeCommand)) {
-            log.error("Expected HandShakeCommand but got " + obj.getClass().getSimpleName() + " from " + sess.getRemoteAddress().toString());
+            log.error("Expected HandShakeCommand but got "
+                    + obj.getClass().getSimpleName() + " from "
+                    + sess.getRemoteAddress().toString());
             //Nope...buh-bye...
             sess.close();
             return;
@@ -187,11 +195,10 @@
         sess.write(keyCommand);
 
         //Now schedule a timeout for authorization
-        ScheduledFuture authTask = ((TCPSocketHandler) sess.getHandler()).schedule(
-                new TimeoutTask(sess), 5000);
+        ScheduledFuture authTask = ((TCPSocketHandler) sess.getHandler())
+                .schedule(new TimeoutTask(sess), 5000);
         sess.setAttribute(Constants.AUTH_TASK, authTask);
 
-
     }
 
     class TimeoutTask implements Runnable {
@@ -203,9 +210,10 @@
         }
 
         public void run() {
-            log.error("Timeout waiting for Handshake or Login from "
-                    + sess.getRemoteAddress().toString()
-                    + ", removing client.");
+            log
+                    .error("Timeout waiting for Handshake or Login from "
+                            + sess.getRemoteAddress().toString()
+                            + ", removing client.");
             //Close the session, its no good since it cannot authenticate
             sess.close();
         }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java Thu Oct 26 19:55:28 2006
@@ -17,85 +17,83 @@
  */
 package org.apache.geronimo.gcache.transports.tcp;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimerTask;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.geronimo.gcache.command.MessageAckCommand;
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
+
 public class TCPMessageAckCommandFilter extends IoFilterAdapter {
 
-    private static final Log log = LogFactory.getLog(TCPMessageAckCommandFilter.class);
+    private static final Log log = LogFactory
+            .getLog(TCPMessageAckCommandFilter.class);
 
     public final static String NAME = "MessageAckCommandFilter";
 
-
-    public Map<Long, AckTask> waitingAcks = new HashMap<Long, AckTask>();
-
     private long ackTimeout;
 
     public TCPMessageAckCommandFilter(long ackTimeout) {
-	super();
-	this.ackTimeout = ackTimeout;
+        if (log.isDebugEnabled())
+            log.debug("TCPMessageAckFilter created with a timeout of " + ackTimeout);
+        this.ackTimeout = ackTimeout;
     }
 
     @Override
     public void messageReceived(NextFilter nextFilter, IoSession sess,
-	    Object obj) throws Exception {
+            Object obj) throws Exception {
+
+        //Look for only MessageAckCommands
+        if (obj instanceof MessageAckCommand) {
 
-	//Look for only MessageAckCommands
-	if (obj instanceof MessageAckCommand) {
+            MessageAckCommand command = (MessageAckCommand) obj;
+            Long commandId = command.getMessageId();
 
-	    MessageAckCommand command = (MessageAckCommand) obj;
-	    Long commandId = command.getMessageId();
-	    
-	    AckTask ack = waitingAcks.get(commandId);
-	    if (ack != null){
-		//Cancel the timer
-		ack.cancel();
-
-		//Remove it off the watch list
-		waitingAcks.remove(commandId);
-	    }
-
-	    //Consume the message
-	    return;
-	}
+            ScheduledFuture handle = (ScheduledFuture) sess.getAttribute(Constants.MESSAGE_ACK_ID + commandId);
+            if (handle != null) {
+                //Cancel the timer
+                if (!handle.cancel(false)){
+                    //Too late, it's firing...this connection is done
+                    return;
+                }
+                    
+            }
+            
+            log.debug("MessageAck received.");
+
+            //Consume the message
+            //return;
+        }
 
-	//Process the command
-	nextFilter.messageReceived(sess, obj);
+        //Process the command
+        nextFilter.messageReceived(sess, obj);
     }
 
     public void requestAck(long commandId, IoSession sess) {
-	AckTask task = new AckTask(commandId, sess);
-	TCPSocketHandler handler = (TCPSocketHandler)sess.getHandler();
-	handler.schedule(task, ackTimeout);
+        AckTask task = new AckTask(commandId, sess);
+        TCPSocketHandler handler = (TCPSocketHandler) sess.getHandler();
+        ScheduledFuture handle = handler.schedule(task, ackTimeout);
+        sess.setAttribute(Constants.MESSAGE_ACK_ID + commandId, handle);
     }
 
-    class AckTask extends TimerTask {
+    class AckTask implements Runnable {
 
-	private long commandId;
+        private long commandId;
 
-	private IoSession sess;
+        private IoSession sess;
 
-	public AckTask(long commandId, IoSession sess) {
-	    this.commandId = commandId;
-	    this.sess = sess;
-	}
-
-	@Override
-	public void run() {
-	    log.error("Timeout waiting for Message ack for commandid="
-		    + commandId + ", removing client.");
-	    //Remove it off the watch list
-	    waitingAcks.remove(commandId);
-	    //Close the session, its no good since it cannot ack the message
-	    sess.close();
-	}
+        public AckTask(long commandId, IoSession sess) {
+            this.commandId = commandId;
+            this.sess = sess;
+        }
+
+        public void run() {
+            log.error("Timeout waiting for Message ack for commandid="
+                    + commandId + ", removing client.");
+            //Close the session, its no good since it cannot ack the message
+            sess.close();
+        }
 
     }
 

Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java Thu Oct 26 19:55:28 2006
@@ -119,7 +119,7 @@
 
         byte recBuf[] = new byte[Constants.HEADER_SIZE];
 
-//	socket.setSoTimeout(2000);
+	    socket.setSoTimeout(2000);
         InputStream is = socket.getInputStream();
         int read = is.read(recBuf);
         assert read == Constants.HEADER_SIZE;

Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java Thu Oct 26 19:55:28 2006
@@ -30,7 +30,9 @@
     
     @Test
     public void testJoinCluster() throws Exception{
-        assert 0 == info.getEndpointManager().size();
+        
+        //Should be 2 client end points
+        assert 2 == info.getEndpointManager().size();
 	GetCacheCommand command = new GetCacheCommand();
 	
 	command.setCacheName("Cache1");
@@ -49,26 +51,22 @@
         //Nothing is in the Cache, so no commands should follow
         assert commandsToFollow == 0; 
         
-        //Should have one client
-        assert 1 == info.getEndpointManager().size();
+        client.close();
         
-        Set<Endpoint> set = info.getEndpointManager().getEndpoints();
-        TCPEndpoint endpoint =  (TCPEndpoint)set.iterator().next();
-        InetSocketAddress remoteAddr = (InetSocketAddress)endpoint.getIoSession().getRemoteAddress();
-        InetSocketAddress localAddr = (InetSocketAddress)client.getLocalSocketAddress();
+        //Give the server some time to figure out the client closed the connection
+        Thread.sleep(1000);
         
-        //Check that the socket addresses match (Remote on server == Local for client)
-        assert remoteAddr.getPort() == localAddr.getPort();
+        //The endpoint should have been removed
+        Set<Endpoint>set = info.getEndpointManager().getEndpoints();
+        assert set.size() == 1;
         
-        client.close();
+        client2.close();
         
         //Give the server some time to figure out the client closed the connection
         Thread.sleep(1000);
         
-        //The endpoint should have been removed
         set = info.getEndpointManager().getEndpoints();
         assert set.size() == 0;
-        
     }
 
 }