You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/26 02:27:12 UTC

svn commit: r467819 - in /geronimo/sandbox/gcache/server/src: main/java/org/apache/geronimo/gcache/transports/tcp/ test/java/org/apache/geronimo/gcache/transports/tcp/

Author: jgenender
Date: Wed Oct 25 17:27:10 2006
New Revision: 467819

URL: http://svn.apache.org/viewvc?view=rev&rev=467819
Log:
Round trip with acks and ack timeouts.  Acks made opional

Added:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java   (with props)
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java   (with props)
Modified:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java Wed Oct 25 17:27:10 2006
@@ -21,4 +21,8 @@
 public class Constants {
     public final static byte MAGIC[] = new byte[]{'G', 'C', 'a', 'c', 'h', 'e'};
     public final static int HEADER_SIZE = MAGIC.length + 1 + 4;
+    
+    //Session attribute keys
+    public final static String BULK_COUNT = "BULK_COUNT_";
+    public final static String BULK_COMMAND_ID = "BULK_COMMAND_ID_";
 }

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java?view=auto&rev=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java Wed Oct 25 17:27:10 2006
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+public class TCPBulkCommandFilter extends IoFilterAdapter {
+
+    public final static Log log = LogFactory.getLog(TCPBulkCommandFilter.class);
+
+    public static final String NAME = "BulkCommandFilter";
+
+    @Override
+    public void messageReceived(NextFilter nextFilter, IoSession sess,
+	    Object obj) throws Exception {
+
+	BaseCommand command = (BaseCommand) obj;
+
+	String BULK_COUNT = Constants.BULK_COUNT + command.getCommandId();
+	String BULK_COMMAND_ID = Constants.BULK_COMMAND_ID
+		+ command.getCommandId();
+
+	//Check to see if we are processing a bulk command
+	if (!sess.containsAttribute(Constants.BULK_COMMAND_ID + command.getCommandId())) {
+	    //Not a bulk command, so process the command
+	    nextFilter.messageReceived(sess, obj);
+	    return;
+	}
+
+	Integer bulkCount = (Integer) sess.getAttribute(BULK_COUNT);
+
+	//If the bulk command hit zero, remove the BULK attributes
+	if (bulkCount == 0) {
+
+	    //Remove the attributes
+	    sess.removeAttribute(BULK_COUNT);
+	    sess.removeAttribute(BULK_COMMAND_ID);
+
+	    //Send the ack command back if required
+	    if (sess.getFilterChain().get(TCPMessageAckCommandFilter.NAME) != null){
+		MessageAckCommand ack = new MessageAckCommand();
+		ack.setCommandId(command.getCommandId());
+		//Send the Ack
+		sess.write(ack);
+	    }
+
+	} else {
+	    //Decrease the command count
+	    bulkCount -= 1;
+	    sess.setAttribute(BULK_COUNT, bulkCount);
+	}
+
+	//Process the command
+	nextFilter.messageReceived(sess, obj);
+    }
+
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java Wed Oct 25 17:27:10 2006
@@ -31,6 +31,7 @@
 import org.apache.geronimo.gcache.server.listeners.CacheNotifier;
 import org.apache.geronimo.gcache.transports.Endpoint;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
 
 public class TCPCacheNotifier extends CacheNotifier {
     private Log log = LogFactory.getLog(TCPCacheNotifier.class);
@@ -67,8 +68,10 @@
 	    log.error("Cannot marshal packet, cannot send to endpoints", e);
 	    return;
 	}
-	
+
 	TCPEndpoint origEndpoint = (TCPEndpoint) command.getAttachment();
+
+	//Spin through the client list
 	Set<Endpoint> set = info.getEndpointManager().getEndpoints();
 	synchronized (set) {
 	    for (Endpoint endpoint : set) {
@@ -78,12 +81,20 @@
 		if (origEndpoint != null && origEndpoint.equals(tcp)) {
 		    continue;
 		}
-		
-		tcp.getIoSession().write(buffer);
+
+		IoSession sess = tcp.getIoSession();
+		sess.write(buffer);
+
+		//See if we need to request an Ack
+		TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) sess
+			.getFilterChain().get(TCPMessageAckCommandFilter.NAME);
+		if (filter != null) {
+		    long commandId = command.getCommandId();
+		    filter.requestAck(commandId, sess);
+		}
 
 	    }
 	}
 
     }
-
 }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Wed Oct 25 17:27:10 2006
@@ -19,9 +19,6 @@
 package org.apache.geronimo.gcache.transports.tcp;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -43,7 +40,6 @@
 import org.apache.geronimo.gcache.command.RemoveEntryCommand;
 import org.apache.geronimo.gcache.command.RemoveSessionCommand;
 import org.apache.geronimo.gcache.transports.CommandVisitor;
-import org.apache.geronimo.gcache.util.BufferChannelWriter;
 import org.apache.mina.common.IoSession;
 
 public class TCPCommandVisitor implements CommandVisitor {
@@ -53,7 +49,10 @@
 
     private TCPEndpoint endpoint;
 
+    private IoSession sess;
+
     public TCPCommandVisitor(CacheInfoHolder infoHolder, IoSession sess) {
+	this.sess = sess;
 	endpoint = new TCPEndpoint(sess);
 	this.infoHolder = infoHolder;
     }
@@ -67,8 +66,9 @@
 	    cache.remove(command.getSessionId());
 	}
 
-	// Notify peers
+	command.setAttachment(endpoint);
 	infoHolder.getCacheNotifier().notifyRemoveSession(command);
+	sendAck(command);
     }
 
     @SuppressWarnings("unchecked")
@@ -100,9 +100,10 @@
 	    cache.remove(command.getHashableKey());
 	}
 
-	// Notify peers
 	command.setAttachment(endpoint);
 	infoHolder.getCacheNotifier().notifyRemove(command);
+	sendAck(command);
+
     }
 
     public void processPutSession(PutSessionCommand command) {
@@ -113,25 +114,20 @@
 	    cache.put(new Element(command.getSessionId(), command
 		    .getRawSessionFromPayload()));
 
-	    // Ack the message
-	    MessageAckCommand ack = new MessageAckCommand();
-	    ack.setMessageId(command.getCommandId());
-	    IoSession sess = endpoint.getIoSession();
-	    if (sess != null)
-		sess.write(ack);
+	    command.setAttachment(endpoint);
+	    infoHolder.getCacheNotifier().notifyPutSession(command);
+	    
+	    sendAck(command);
 	    
 	} catch (IOException e) {
 	    // TODO - What should we do on an IOException, ignore it or
-                // remove the client?
+	    // remove the client?
 	    log.error(e);
 	}
 
-	// Notify peers
-	command.setAttachment(endpoint);
-	infoHolder.getCacheNotifier().notifyPutSession(command);
     }
 
-    @SuppressWarnings({ "unchecked" })
+    @SuppressWarnings( { "unchecked" })
     public void processPutEntry(PutEntryCommand command) {
 	Cache cache = infoHolder.getCache(command.getCacheName(), true);
 
@@ -159,29 +155,33 @@
 		    .getRawPayload()));
 	}
 
-	// Notify peers
 	command.setAttachment(endpoint);
 	infoHolder.getCacheNotifier().notifyPut(command);
+	sendAck(command);
     }
 
     public void processMessageAck(MessageAckCommand command) {
+	//This should never get called as the filters will handle it
     }
 
     @SuppressWarnings("unchecked")
     public void processGetCache(GetCacheCommand command) {
 	Cache cache = infoHolder.getCache(command.getCacheName(), true);
-	
+
 	//Add the client endpoint
 	infoHolder.getEndpointManager().addEndpoint(endpoint);
 	IoSession sess = endpoint.getIoSession();
-	
+
 	//Send a bulk command
 	BulkSendCommand bulk = new BulkSendCommand();
 	bulk.setNumberOfCommands(cache.getSize());
+
+	long commandId = bulk.getCommandId();
+
 	try {
 	    if (sess != null)
 		sess.write(bulk);
-	    
+
 	    for (Object key : (List<Object>) cache.getKeys()) {
 		Element element = cache.get(key);
 		Object payload = element.getValue();
@@ -198,19 +198,26 @@
 		    PutEntryCommand pec = new PutEntryCommand();
 		    pec.setCacheName(command.getCacheName());
 		    pec.setRawPayload((byte[]) payload);
-		    pec.setRawKey((byte [])key);
+		    pec.setRawKey((byte[]) key);
 		    newCommand = pec;
 		}
-		
+
+		//Set all the commands to the same commandId for a bulk send
+		newCommand.setCommandId(commandId);
+
 		//Send the packet.  If there is a failure just abort
-		if (sess != null)
+		if (sess != null) {
 		    sess.write(newCommand);
+		}
 	    }
 	    
-	    
+	    //We are returning a request there for we don't send an ack...
+	    //we request one
+	    requestAck(bulk);
+
 	} catch (IOException e) {
 	    // TODO - What should we do on an IOException, ignore it or
-                // remove the client?
+	    // remove the client?
 	    log.error(e);
 	}
     }
@@ -219,12 +226,53 @@
 	Cache cache = infoHolder.getCache(command.getCacheName(), true);
 	cache.removeAll();
 
-	// Notify peers
 	command.setAttachment(endpoint);
 	infoHolder.getCacheNotifier().notifyClearCache(command);
+	
+	sendAck(command);
     }
 
     public void processBulkSend(BulkSendCommand command) {
+
+	//Get the command count and set the attribute to count em down
+	int commandCount = command.getNumberOfCommands();
+	
+	if (sess == null)
+	    return;
+
+	//Setup the BulkCommand filter to process bulk commands
+	sess.setAttribute(Constants.BULK_COUNT + command.getCommandId(),
+		commandCount);
+	sess.setAttribute(Constants.BULK_COMMAND_ID + command.getCommandId());
+	
+	//Do not send an ack here, the filter will send it once all
+	//of the following commands have been received
+    }
+    
+    private void sendAck(BaseCommand command){
+	
+	if (sess == null)
+	    return;
+	
+	if (sess.getFilterChain().contains(TCPMessageAckCommandFilter.class)){
+	   MessageAckCommand ack = new MessageAckCommand();
+	   ack.setMessageId(command.getCommandId());
+	   sess.write(ack);
+	}
+    }
+
+    private void requestAck(BaseCommand command) {
+
+	if (sess == null)
+	    return;
+	
+	//See if we need to send an Ack
+	TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) sess
+		.getFilterChain().get(TCPMessageAckCommandFilter.NAME);
+	if (filter != null) {
+	    long commandId = command.getCommandId();
+	    filter.requestAck(commandId, sess);
+	}
     }
 
 }

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java?view=auto&rev=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java Wed Oct 25 17:27:10 2006
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+public class TCPMessageAckCommandFilter extends IoFilterAdapter {
+
+    private static final Log log = LogFactory
+	    .getLog(TCPMessageAckCommandFilter.class);
+
+    public final static String NAME = "MessageAckCommandFilter";
+
+    private final Timer timer = new Timer();
+
+    public Map<Long, AckTask> waitingAcks = new HashMap<Long, AckTask>();
+
+    private long ackTimeout;
+
+    public TCPMessageAckCommandFilter(long ackTimeout) {
+	super();
+	this.ackTimeout = ackTimeout;
+    }
+
+    @Override
+    public void messageReceived(NextFilter nextFilter, IoSession sess,
+	    Object obj) throws Exception {
+
+	//Look for only MessageAckCommands
+	if (obj instanceof MessageAckCommand) {
+
+	    MessageAckCommand command = (MessageAckCommand) obj;
+	    Long commandId = command.getMessageId();
+	    
+	    AckTask ack = waitingAcks.get(commandId);
+	    if (ack != null){
+		//Cancel the timer
+		ack.cancel();
+
+		//Remove it off the watch list
+		waitingAcks.remove(commandId);
+	    }
+
+	    //Consume the message
+	    return;
+	}
+
+	//Process the command
+	nextFilter.messageReceived(sess, obj);
+    }
+
+    @Override
+    public void destroy() throws Exception {
+	//Shut down the timer.
+	timer.cancel();
+    }
+
+    public void requestAck(long commandId, IoSession sess) {
+	AckTask task = new AckTask(commandId, sess);
+	timer.schedule(task, ackTimeout);
+
+    }
+
+    class AckTask extends TimerTask {
+
+	private long commandId;
+
+	private IoSession sess;
+
+	public AckTask(long commandId, IoSession sess) {
+	    this.commandId = commandId;
+	    this.sess = sess;
+	}
+
+	@Override
+	public void run() {
+	    log.error("Timeout waiting for Message ack for commandid="
+		    + commandId + ", removing client.");
+	    //Remove it off the watch list
+	    waitingAcks.remove(commandId);
+	    //Close the session, its no good since it cannot ack the message
+	    sess.close();
+	}
+
+    }
+
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java Wed Oct 25 17:27:10 2006
@@ -27,9 +27,9 @@
  */
 public class TCPSocketHandler extends IoHandlerAdapter {
     
-    private CacheInfoHolder infoHolder = null;
+    private final CacheInfoHolder infoHolder;
     
-    public TCPSocketHandler(CacheInfoHolder infoHolder) {
+    public TCPSocketHandler(final CacheInfoHolder infoHolder) {
 	this.infoHolder = infoHolder;
     }
 
@@ -53,15 +53,9 @@
     }
 
     @Override
-    public void messageSent(IoSession arg0, Object arg1) throws Exception {
-	// TODO Auto-generated method stub
-	super.messageSent(arg0, arg1);
-    }
-
-    @Override
     public void sessionClosed(IoSession sess) throws Exception {
 	//Remove the client from the list
 	infoHolder.getEndpointManager().removeEndpoint(new TCPEndpoint(sess));
     }
-    
+
 }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java Wed Oct 25 17:27:10 2006
@@ -16,10 +16,11 @@
  */
 package org.apache.geronimo.gcache.transports.tcp;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.geronimo.gcache.CacheInfoHolder;
 import org.apache.geronimo.gcache.transports.TransportService;
 import org.apache.mina.common.IoAcceptor;
@@ -30,15 +31,47 @@
 
 public class TCPSocketTransportService implements TransportService {
     
+    private final static Log log = LogFactory.getLog(TCPSocketTransportService.class);
+    
+    public static final long DEFAULT_ACK_TIMEOUT = 10000;
+    
     IoAcceptor acceptor = null; 
     private CacheInfoHolder info;
     private String uriString;
+    private boolean requireMessageAck = false;
+    private boolean enableLogging = false;
+    private long ackTimeout = DEFAULT_ACK_TIMEOUT;
+    private TCPMessageAckCommandFilter messageAckCommandFilter = null;
 
     public TCPSocketTransportService(String uriString, CacheInfoHolder info) {
         this.info = info;
         this.uriString = uriString;
     }
 
+    public boolean isRequireMessageAck() {
+        return requireMessageAck;
+    }
+
+    public void setRequireMessageAck(boolean requireMessageAck) {
+        this.requireMessageAck = requireMessageAck;
+    }
+
+    public long getAckTimeout() {
+        return ackTimeout;
+    }
+
+    public void setAckTimeout(long ackTimeout) {
+        this.ackTimeout = ackTimeout;
+    }
+
+    public boolean isEnableLogging() {
+        return enableLogging;
+    }
+
+    public void setEnableLogging(boolean enableLogging) {
+        this.enableLogging = enableLogging;
+    }
+
     public void start() throws Exception {
 	URI uri = new URI(uriString);
 	int port = uri.getPort();
@@ -51,16 +84,40 @@
 	acceptor = new SocketAcceptor();
         SocketAcceptorConfig cfg = new SocketAcceptorConfig();
         cfg.setReuseAddress( true );
+        
+        //Add the filter to hande the GCache Codec
         cfg.getFilterChain().addLast( "protocolFilter", new ProtocolCodecFilter( new TCPCommandProtocolCodecFactory() ) );
-        cfg.getFilterChain().addLast( "logFilter", new LoggingFilter() );
+        
+        //Add the BulkCommandFilter for filtering when a BulkCommand is being read
+        cfg.getFilterChain().addLast( TCPBulkCommandFilter.NAME, new TCPBulkCommandFilter() );
+        
+        //If we require message acks, then set up the filter
+	if (requireMessageAck){
+	    messageAckCommandFilter = new TCPMessageAckCommandFilter(ackTimeout);
+	    cfg.getFilterChain().addLast(TCPMessageAckCommandFilter.NAME, messageAckCommandFilter);
+	}        
+	
+        if (enableLogging){
+            cfg.getFilterChain().addLast( "logFilter", new LoggingFilter() );
+        }
 
         acceptor.bind( inet, new TCPSocketHandler(info), cfg );
     }
 
-    public void stop() throws IOException {
+    public void stop() throws Exception {
 	
+	if (messageAckCommandFilter != null){
+	    try {
+		messageAckCommandFilter.destroy();
+		messageAckCommandFilter = null;
+	    } catch (Exception e) {
+		log.error(e);
+		//Ignore since there is not much that can be done
+	    }
+	}
 	if (acceptor != null){
 	    acceptor.unbindAll();
+	    acceptor = null;
 	}
     }
 

Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java Wed Oct 25 17:27:10 2006
@@ -39,7 +39,7 @@
 public abstract class AbstractService {
     protected String protocol = "tcp://localhost:45678";
 
-    protected TransportService server = null;
+    protected TCPSocketTransportService server = null;
     protected Socket client;
     protected Socket client2;
     protected CacheInfoHolder info;
@@ -53,6 +53,9 @@
         URI uri = new URI(protocol);
 
         server = new TCPSocketTransportService(protocol, info);
+        server.setAckTimeout(10000);
+        server.setRequireMessageAck(true);
+        server.setEnableLogging(true);
 
         server.start();