You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/27 04:55:29 UTC
svn commit: r468243 - in /geronimo/sandbox/gcache/server/src:
main/java/org/apache/geronimo/gcache/transports/tcp/
test/java/org/apache/geronimo/gcache/transports/tcp/
Author: jgenender
Date: Thu Oct 26 19:55:28 2006
New Revision: 468243
URL: http://svn.apache.org/viewvc?view=rev&rev=468243
Log:
Get unit tests to pass
Modified:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java Thu Oct 26 19:55:28 2006
@@ -27,5 +27,6 @@
public final static String AUTH_TASK = "AUTH_TASK";
public final static String BULK_COUNT = "BULK_COUNT_";
public final static String BULK_COMMAND_ID = "BULK_COMMAND_ID_";
+ public final static String MESSAGE_ACK_ID = "MESSAGE_ACK_ID_";
public final static String REMOTE_PUBLIC_KEY = "REMOTE_PUBLIC_KEY";
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java Thu Oct 26 19:55:28 2006
@@ -24,11 +24,11 @@
import org.apache.geronimo.gcache.CacheInfoHolder;
import org.apache.geronimo.gcache.command.AuthCommand;
import org.apache.geronimo.gcache.command.HandShakeCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
import org.apache.geronimo.gcache.command.PublicKeyCommand;
import org.apache.geronimo.gcache.util.CipherUtil;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.NextFilter;
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
@@ -39,8 +39,8 @@
public final static String NAME = "AuthenticationFilter";
-
private final String userId;
+
private final String password;
public TCPAuthenticationFilter(final String userId, final String password) {
@@ -49,7 +49,8 @@
}
@Override
- public void sessionOpened(NextFilter nextFilter, IoSession sess) throws Exception {
+ public void sessionOpened(NextFilter nextFilter, IoSession sess)
+ throws Exception {
//Start up the response timeout
ScheduledFuture task = ((TCPSocketHandler) sess.getHandler()).schedule(
new TimeoutTask(sess), 5000);
@@ -70,7 +71,7 @@
@Override
public void messageReceived(NextFilter nextFilter, IoSession sess,
- Object obj) throws Exception {
+ Object obj) throws Exception {
//If we have authenticated, continue on
if (sess.containsAttribute(Constants.AUTHENTICATED)) {
@@ -83,12 +84,11 @@
//If the session has a remote public key, then it's an Auth Command
if (sess.containsAttribute(Constants.REMOTE_PUBLIC_KEY)) {
handleAuth(sess, obj);
- return;
+ } else {
+ //No Remote Public Key, so this should be a handshake
+ handleHandShake(sess, obj);
}
- //No Remote Public Key, so this should be a handshake
- handleHandShake(sess, obj);
-
//Consume the message (don't pass it on)
nextFilter.messageReceived(sess, obj);
}
@@ -96,12 +96,15 @@
private void handleAuth(IoSession sess, Object obj) throws Exception {
if (!(obj instanceof AuthCommand)) {
//Nope...buh-bye...
- log.error("Expected AuthCommand but got " + obj.getClass().getSimpleName() + " from " + sess.getRemoteAddress().toString());
+ log.error("Expected AuthCommand but got "
+ + obj.getClass().getSimpleName() + " from "
+ + sess.getRemoteAddress().toString());
sess.close();
return;
}
- ScheduledFuture authTask = (ScheduledFuture) sess.getAttribute(Constants.AUTH_TASK);
+ ScheduledFuture authTask = (ScheduledFuture) sess
+ .getAttribute(Constants.AUTH_TASK);
//Cancel the timer
if (!authTask.cancel(false)) {
@@ -116,7 +119,8 @@
log.debug("User Id read was '" + authUserId + "'");
}
if (!userId.equals(authUserId)) {
- log.error("Authentication failure for " + sess.getRemoteAddress().toString());
+ log.error("Authentication failure for "
+ + sess.getRemoteAddress().toString());
sess.close();
return;
}
@@ -126,7 +130,8 @@
log.debug("Password read was '" + authPassword + "'");
}
if (!password.equals(authPassword)) {
- log.error("Authentication failure for " + sess.getRemoteAddress().toString());
+ log.error("Authentication failure for "
+ + sess.getRemoteAddress().toString());
sess.close();
return;
}
@@ -140,15 +145,16 @@
sess.setAttribute(Constants.AUTHENTICATED);
//Now add the client to the cache to start receiving events
- CacheInfoHolder infoHolder = ((TCPSocketHandler) sess.getHandler()).getInfoHolder();
+ CacheInfoHolder infoHolder = ((TCPSocketHandler) sess.getHandler())
+ .getInfoHolder();
infoHolder.getEndpointManager().addEndpoint(new TCPEndpoint(sess));
//See if we need to send an Ack
- TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) sess
- .getFilterChain().get(TCPMessageAckCommandFilter.NAME);
- if (filter != null) {
- long commandId = auth.getCommandId();
- filter.requestAck(commandId, sess);
+ if (sess.getFilterChain().contains(TCPMessageAckCommandFilter.NAME)){
+ log.debug("Sending MessageAck");
+ MessageAckCommand ack = new MessageAckCommand();
+ ack.setMessageId(auth.getCommandId());
+ sess.write(ack);
}
}
@@ -158,7 +164,9 @@
.getAttribute(Constants.AUTH_TASK);
if (!(obj instanceof HandShakeCommand)) {
- log.error("Expected HandShakeCommand but got " + obj.getClass().getSimpleName() + " from " + sess.getRemoteAddress().toString());
+ log.error("Expected HandShakeCommand but got "
+ + obj.getClass().getSimpleName() + " from "
+ + sess.getRemoteAddress().toString());
//Nope...buh-bye...
sess.close();
return;
@@ -187,11 +195,10 @@
sess.write(keyCommand);
//Now schedule a timeout for authorization
- ScheduledFuture authTask = ((TCPSocketHandler) sess.getHandler()).schedule(
- new TimeoutTask(sess), 5000);
+ ScheduledFuture authTask = ((TCPSocketHandler) sess.getHandler())
+ .schedule(new TimeoutTask(sess), 5000);
sess.setAttribute(Constants.AUTH_TASK, authTask);
-
}
class TimeoutTask implements Runnable {
@@ -203,9 +210,10 @@
}
public void run() {
- log.error("Timeout waiting for Handshake or Login from "
- + sess.getRemoteAddress().toString()
- + ", removing client.");
+ log
+ .error("Timeout waiting for Handshake or Login from "
+ + sess.getRemoteAddress().toString()
+ + ", removing client.");
//Close the session, its no good since it cannot authenticate
sess.close();
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java Thu Oct 26 19:55:28 2006
@@ -17,85 +17,83 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimerTask;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.gcache.command.MessageAckCommand;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
+
public class TCPMessageAckCommandFilter extends IoFilterAdapter {
- private static final Log log = LogFactory.getLog(TCPMessageAckCommandFilter.class);
+ private static final Log log = LogFactory
+ .getLog(TCPMessageAckCommandFilter.class);
public final static String NAME = "MessageAckCommandFilter";
-
- public Map<Long, AckTask> waitingAcks = new HashMap<Long, AckTask>();
-
private long ackTimeout;
public TCPMessageAckCommandFilter(long ackTimeout) {
- super();
- this.ackTimeout = ackTimeout;
+ if (log.isDebugEnabled())
+ log.debug("TCPMessageAckFilter created with a timeout of " + ackTimeout);
+ this.ackTimeout = ackTimeout;
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession sess,
- Object obj) throws Exception {
+ Object obj) throws Exception {
+
+ //Look for only MessageAckCommands
+ if (obj instanceof MessageAckCommand) {
- //Look for only MessageAckCommands
- if (obj instanceof MessageAckCommand) {
+ MessageAckCommand command = (MessageAckCommand) obj;
+ Long commandId = command.getMessageId();
- MessageAckCommand command = (MessageAckCommand) obj;
- Long commandId = command.getMessageId();
-
- AckTask ack = waitingAcks.get(commandId);
- if (ack != null){
- //Cancel the timer
- ack.cancel();
-
- //Remove it off the watch list
- waitingAcks.remove(commandId);
- }
-
- //Consume the message
- return;
- }
+ ScheduledFuture handle = (ScheduledFuture) sess.getAttribute(Constants.MESSAGE_ACK_ID + commandId);
+ if (handle != null) {
+ //Cancel the timer
+ if (!handle.cancel(false)){
+ //Too late, it's firing...this connection is done
+ return;
+ }
+
+ }
+
+ log.debug("MessageAck received.");
+
+ //Consume the message
+ //return;
+ }
- //Process the command
- nextFilter.messageReceived(sess, obj);
+ //Process the command
+ nextFilter.messageReceived(sess, obj);
}
public void requestAck(long commandId, IoSession sess) {
- AckTask task = new AckTask(commandId, sess);
- TCPSocketHandler handler = (TCPSocketHandler)sess.getHandler();
- handler.schedule(task, ackTimeout);
+ AckTask task = new AckTask(commandId, sess);
+ TCPSocketHandler handler = (TCPSocketHandler) sess.getHandler();
+ ScheduledFuture handle = handler.schedule(task, ackTimeout);
+ sess.setAttribute(Constants.MESSAGE_ACK_ID + commandId, handle);
}
- class AckTask extends TimerTask {
+ class AckTask implements Runnable {
- private long commandId;
+ private long commandId;
- private IoSession sess;
+ private IoSession sess;
- public AckTask(long commandId, IoSession sess) {
- this.commandId = commandId;
- this.sess = sess;
- }
-
- @Override
- public void run() {
- log.error("Timeout waiting for Message ack for commandid="
- + commandId + ", removing client.");
- //Remove it off the watch list
- waitingAcks.remove(commandId);
- //Close the session, its no good since it cannot ack the message
- sess.close();
- }
+ public AckTask(long commandId, IoSession sess) {
+ this.commandId = commandId;
+ this.sess = sess;
+ }
+
+ public void run() {
+ log.error("Timeout waiting for Message ack for commandid="
+ + commandId + ", removing client.");
+ //Close the session, its no good since it cannot ack the message
+ sess.close();
+ }
}
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java Thu Oct 26 19:55:28 2006
@@ -119,7 +119,7 @@
byte recBuf[] = new byte[Constants.HEADER_SIZE];
-// socket.setSoTimeout(2000);
+ socket.setSoTimeout(2000);
InputStream is = socket.getInputStream();
int read = is.read(recBuf);
assert read == Constants.HEADER_SIZE;
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java?view=diff&rev=468243&r1=468242&r2=468243
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java Thu Oct 26 19:55:28 2006
@@ -30,7 +30,9 @@
@Test
public void testJoinCluster() throws Exception{
- assert 0 == info.getEndpointManager().size();
+
+ //Should be 2 client end points
+ assert 2 == info.getEndpointManager().size();
GetCacheCommand command = new GetCacheCommand();
command.setCacheName("Cache1");
@@ -49,26 +51,22 @@
//Nothing is in the Cache, so no commands should follow
assert commandsToFollow == 0;
- //Should have one client
- assert 1 == info.getEndpointManager().size();
+ client.close();
- Set<Endpoint> set = info.getEndpointManager().getEndpoints();
- TCPEndpoint endpoint = (TCPEndpoint)set.iterator().next();
- InetSocketAddress remoteAddr = (InetSocketAddress)endpoint.getIoSession().getRemoteAddress();
- InetSocketAddress localAddr = (InetSocketAddress)client.getLocalSocketAddress();
+ //Give the server some time to figure out the client closed the connection
+ Thread.sleep(1000);
- //Check that the socket addresses match (Remote on server == Local for client)
- assert remoteAddr.getPort() == localAddr.getPort();
+ //The endpoint should have been removed
+ Set<Endpoint>set = info.getEndpointManager().getEndpoints();
+ assert set.size() == 1;
- client.close();
+ client2.close();
//Give the server some time to figure out the client closed the connection
Thread.sleep(1000);
- //The endpoint should have been removed
set = info.getEndpointManager().getEndpoints();
assert set.size() == 0;
-
}
}