You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/26 02:27:12 UTC
svn commit: r467819 - in /geronimo/sandbox/gcache/server/src:
main/java/org/apache/geronimo/gcache/transports/tcp/
test/java/org/apache/geronimo/gcache/transports/tcp/
Author: jgenender
Date: Wed Oct 25 17:27:10 2006
New Revision: 467819
URL: http://svn.apache.org/viewvc?view=rev&rev=467819
Log:
Round trip with acks and ack timeouts. Acks made opional
Added:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java (with props)
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java (with props)
Modified:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java Wed Oct 25 17:27:10 2006
@@ -21,4 +21,8 @@
public class Constants {
public final static byte MAGIC[] = new byte[]{'G', 'C', 'a', 'c', 'h', 'e'};
public final static int HEADER_SIZE = MAGIC.length + 1 + 4;
+
+ //Session attribute keys
+ public final static String BULK_COUNT = "BULK_COUNT_";
+ public final static String BULK_COMMAND_ID = "BULK_COMMAND_ID_";
}
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java?view=auto&rev=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java Wed Oct 25 17:27:10 2006
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+public class TCPBulkCommandFilter extends IoFilterAdapter {
+
+ public final static Log log = LogFactory.getLog(TCPBulkCommandFilter.class);
+
+ public static final String NAME = "BulkCommandFilter";
+
+ @Override
+ public void messageReceived(NextFilter nextFilter, IoSession sess,
+ Object obj) throws Exception {
+
+ BaseCommand command = (BaseCommand) obj;
+
+ String BULK_COUNT = Constants.BULK_COUNT + command.getCommandId();
+ String BULK_COMMAND_ID = Constants.BULK_COMMAND_ID
+ + command.getCommandId();
+
+ //Check to see if we are processing a bulk command
+ if (!sess.containsAttribute(Constants.BULK_COMMAND_ID + command.getCommandId())) {
+ //Not a bulk command, so process the command
+ nextFilter.messageReceived(sess, obj);
+ return;
+ }
+
+ Integer bulkCount = (Integer) sess.getAttribute(BULK_COUNT);
+
+ //If the bulk command hit zero, remove the BULK attributes
+ if (bulkCount == 0) {
+
+ //Remove the attributes
+ sess.removeAttribute(BULK_COUNT);
+ sess.removeAttribute(BULK_COMMAND_ID);
+
+ //Send the ack command back if required
+ if (sess.getFilterChain().get(TCPMessageAckCommandFilter.NAME) != null){
+ MessageAckCommand ack = new MessageAckCommand();
+ ack.setCommandId(command.getCommandId());
+ //Send the Ack
+ sess.write(ack);
+ }
+
+ } else {
+ //Decrease the command count
+ bulkCount -= 1;
+ sess.setAttribute(BULK_COUNT, bulkCount);
+ }
+
+ //Process the command
+ nextFilter.messageReceived(sess, obj);
+ }
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java Wed Oct 25 17:27:10 2006
@@ -31,6 +31,7 @@
import org.apache.geronimo.gcache.server.listeners.CacheNotifier;
import org.apache.geronimo.gcache.transports.Endpoint;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
public class TCPCacheNotifier extends CacheNotifier {
private Log log = LogFactory.getLog(TCPCacheNotifier.class);
@@ -67,8 +68,10 @@
log.error("Cannot marshal packet, cannot send to endpoints", e);
return;
}
-
+
TCPEndpoint origEndpoint = (TCPEndpoint) command.getAttachment();
+
+ //Spin through the client list
Set<Endpoint> set = info.getEndpointManager().getEndpoints();
synchronized (set) {
for (Endpoint endpoint : set) {
@@ -78,12 +81,20 @@
if (origEndpoint != null && origEndpoint.equals(tcp)) {
continue;
}
-
- tcp.getIoSession().write(buffer);
+
+ IoSession sess = tcp.getIoSession();
+ sess.write(buffer);
+
+ //See if we need to request an Ack
+ TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) sess
+ .getFilterChain().get(TCPMessageAckCommandFilter.NAME);
+ if (filter != null) {
+ long commandId = command.getCommandId();
+ filter.requestAck(commandId, sess);
+ }
}
}
}
-
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Wed Oct 25 17:27:10 2006
@@ -19,9 +19,6 @@
package org.apache.geronimo.gcache.transports.tcp;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -43,7 +40,6 @@
import org.apache.geronimo.gcache.command.RemoveEntryCommand;
import org.apache.geronimo.gcache.command.RemoveSessionCommand;
import org.apache.geronimo.gcache.transports.CommandVisitor;
-import org.apache.geronimo.gcache.util.BufferChannelWriter;
import org.apache.mina.common.IoSession;
public class TCPCommandVisitor implements CommandVisitor {
@@ -53,7 +49,10 @@
private TCPEndpoint endpoint;
+ private IoSession sess;
+
public TCPCommandVisitor(CacheInfoHolder infoHolder, IoSession sess) {
+ this.sess = sess;
endpoint = new TCPEndpoint(sess);
this.infoHolder = infoHolder;
}
@@ -67,8 +66,9 @@
cache.remove(command.getSessionId());
}
- // Notify peers
+ command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyRemoveSession(command);
+ sendAck(command);
}
@SuppressWarnings("unchecked")
@@ -100,9 +100,10 @@
cache.remove(command.getHashableKey());
}
- // Notify peers
command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyRemove(command);
+ sendAck(command);
+
}
public void processPutSession(PutSessionCommand command) {
@@ -113,25 +114,20 @@
cache.put(new Element(command.getSessionId(), command
.getRawSessionFromPayload()));
- // Ack the message
- MessageAckCommand ack = new MessageAckCommand();
- ack.setMessageId(command.getCommandId());
- IoSession sess = endpoint.getIoSession();
- if (sess != null)
- sess.write(ack);
+ command.setAttachment(endpoint);
+ infoHolder.getCacheNotifier().notifyPutSession(command);
+
+ sendAck(command);
} catch (IOException e) {
// TODO - What should we do on an IOException, ignore it or
- // remove the client?
+ // remove the client?
log.error(e);
}
- // Notify peers
- command.setAttachment(endpoint);
- infoHolder.getCacheNotifier().notifyPutSession(command);
}
- @SuppressWarnings({ "unchecked" })
+ @SuppressWarnings( { "unchecked" })
public void processPutEntry(PutEntryCommand command) {
Cache cache = infoHolder.getCache(command.getCacheName(), true);
@@ -159,29 +155,33 @@
.getRawPayload()));
}
- // Notify peers
command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyPut(command);
+ sendAck(command);
}
public void processMessageAck(MessageAckCommand command) {
+ //This should never get called as the filters will handle it
}
@SuppressWarnings("unchecked")
public void processGetCache(GetCacheCommand command) {
Cache cache = infoHolder.getCache(command.getCacheName(), true);
-
+
//Add the client endpoint
infoHolder.getEndpointManager().addEndpoint(endpoint);
IoSession sess = endpoint.getIoSession();
-
+
//Send a bulk command
BulkSendCommand bulk = new BulkSendCommand();
bulk.setNumberOfCommands(cache.getSize());
+
+ long commandId = bulk.getCommandId();
+
try {
if (sess != null)
sess.write(bulk);
-
+
for (Object key : (List<Object>) cache.getKeys()) {
Element element = cache.get(key);
Object payload = element.getValue();
@@ -198,19 +198,26 @@
PutEntryCommand pec = new PutEntryCommand();
pec.setCacheName(command.getCacheName());
pec.setRawPayload((byte[]) payload);
- pec.setRawKey((byte [])key);
+ pec.setRawKey((byte[]) key);
newCommand = pec;
}
-
+
+ //Set all the commands to the same commandId for a bulk send
+ newCommand.setCommandId(commandId);
+
//Send the packet. If there is a failure just abort
- if (sess != null)
+ if (sess != null) {
sess.write(newCommand);
+ }
}
-
+ //We are returning a request there for we don't send an ack...
+ //we request one
+ requestAck(bulk);
+
} catch (IOException e) {
// TODO - What should we do on an IOException, ignore it or
- // remove the client?
+ // remove the client?
log.error(e);
}
}
@@ -219,12 +226,53 @@
Cache cache = infoHolder.getCache(command.getCacheName(), true);
cache.removeAll();
- // Notify peers
command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyClearCache(command);
+
+ sendAck(command);
}
public void processBulkSend(BulkSendCommand command) {
+
+ //Get the command count and set the attribute to count em down
+ int commandCount = command.getNumberOfCommands();
+
+ if (sess == null)
+ return;
+
+ //Setup the BulkCommand filter to process bulk commands
+ sess.setAttribute(Constants.BULK_COUNT + command.getCommandId(),
+ commandCount);
+ sess.setAttribute(Constants.BULK_COMMAND_ID + command.getCommandId());
+
+ //Do not send an ack here, the filter will send it once all
+ //of the following commands have been received
+ }
+
+ private void sendAck(BaseCommand command){
+
+ if (sess == null)
+ return;
+
+ if (sess.getFilterChain().contains(TCPMessageAckCommandFilter.class)){
+ MessageAckCommand ack = new MessageAckCommand();
+ ack.setMessageId(command.getCommandId());
+ sess.write(ack);
+ }
+ }
+
+ private void requestAck(BaseCommand command) {
+
+ if (sess == null)
+ return;
+
+ //See if we need to send an Ack
+ TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) sess
+ .getFilterChain().get(TCPMessageAckCommandFilter.NAME);
+ if (filter != null) {
+ long commandId = command.getCommandId();
+ filter.requestAck(commandId, sess);
+ }
}
}
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java?view=auto&rev=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java Wed Oct 25 17:27:10 2006
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+public class TCPMessageAckCommandFilter extends IoFilterAdapter {
+
+ private static final Log log = LogFactory
+ .getLog(TCPMessageAckCommandFilter.class);
+
+ public final static String NAME = "MessageAckCommandFilter";
+
+ private final Timer timer = new Timer();
+
+ public Map<Long, AckTask> waitingAcks = new HashMap<Long, AckTask>();
+
+ private long ackTimeout;
+
+ public TCPMessageAckCommandFilter(long ackTimeout) {
+ super();
+ this.ackTimeout = ackTimeout;
+ }
+
+ @Override
+ public void messageReceived(NextFilter nextFilter, IoSession sess,
+ Object obj) throws Exception {
+
+ //Look for only MessageAckCommands
+ if (obj instanceof MessageAckCommand) {
+
+ MessageAckCommand command = (MessageAckCommand) obj;
+ Long commandId = command.getMessageId();
+
+ AckTask ack = waitingAcks.get(commandId);
+ if (ack != null){
+ //Cancel the timer
+ ack.cancel();
+
+ //Remove it off the watch list
+ waitingAcks.remove(commandId);
+ }
+
+ //Consume the message
+ return;
+ }
+
+ //Process the command
+ nextFilter.messageReceived(sess, obj);
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ //Shut down the timer.
+ timer.cancel();
+ }
+
+ public void requestAck(long commandId, IoSession sess) {
+ AckTask task = new AckTask(commandId, sess);
+ timer.schedule(task, ackTimeout);
+
+ }
+
+ class AckTask extends TimerTask {
+
+ private long commandId;
+
+ private IoSession sess;
+
+ public AckTask(long commandId, IoSession sess) {
+ this.commandId = commandId;
+ this.sess = sess;
+ }
+
+ @Override
+ public void run() {
+ log.error("Timeout waiting for Message ack for commandid="
+ + commandId + ", removing client.");
+ //Remove it off the watch list
+ waitingAcks.remove(commandId);
+ //Close the session, its no good since it cannot ack the message
+ sess.close();
+ }
+
+ }
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java Wed Oct 25 17:27:10 2006
@@ -27,9 +27,9 @@
*/
public class TCPSocketHandler extends IoHandlerAdapter {
- private CacheInfoHolder infoHolder = null;
+ private final CacheInfoHolder infoHolder;
- public TCPSocketHandler(CacheInfoHolder infoHolder) {
+ public TCPSocketHandler(final CacheInfoHolder infoHolder) {
this.infoHolder = infoHolder;
}
@@ -53,15 +53,9 @@
}
@Override
- public void messageSent(IoSession arg0, Object arg1) throws Exception {
- // TODO Auto-generated method stub
- super.messageSent(arg0, arg1);
- }
-
- @Override
public void sessionClosed(IoSession sess) throws Exception {
//Remove the client from the list
infoHolder.getEndpointManager().removeEndpoint(new TCPEndpoint(sess));
}
-
+
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java Wed Oct 25 17:27:10 2006
@@ -16,10 +16,11 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.gcache.CacheInfoHolder;
import org.apache.geronimo.gcache.transports.TransportService;
import org.apache.mina.common.IoAcceptor;
@@ -30,15 +31,47 @@
public class TCPSocketTransportService implements TransportService {
+ private final static Log log = LogFactory.getLog(TCPSocketTransportService.class);
+
+ public static final long DEFAULT_ACK_TIMEOUT = 10000;
+
IoAcceptor acceptor = null;
private CacheInfoHolder info;
private String uriString;
+ private boolean requireMessageAck = false;
+ private boolean enableLogging = false;
+ private long ackTimeout = DEFAULT_ACK_TIMEOUT;
+ private TCPMessageAckCommandFilter messageAckCommandFilter = null;
public TCPSocketTransportService(String uriString, CacheInfoHolder info) {
this.info = info;
this.uriString = uriString;
}
+ public boolean isRequireMessageAck() {
+ return requireMessageAck;
+ }
+
+ public void setRequireMessageAck(boolean requireMessageAck) {
+ this.requireMessageAck = requireMessageAck;
+ }
+
+ public long getAckTimeout() {
+ return ackTimeout;
+ }
+
+ public void setAckTimeout(long ackTimeout) {
+ this.ackTimeout = ackTimeout;
+ }
+
+ public boolean isEnableLogging() {
+ return enableLogging;
+ }
+
+ public void setEnableLogging(boolean enableLogging) {
+ this.enableLogging = enableLogging;
+ }
+
public void start() throws Exception {
URI uri = new URI(uriString);
int port = uri.getPort();
@@ -51,16 +84,40 @@
acceptor = new SocketAcceptor();
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.setReuseAddress( true );
+
+ //Add the filter to hande the GCache Codec
cfg.getFilterChain().addLast( "protocolFilter", new ProtocolCodecFilter( new TCPCommandProtocolCodecFactory() ) );
- cfg.getFilterChain().addLast( "logFilter", new LoggingFilter() );
+
+ //Add the BulkCommandFilter for filtering when a BulkCommand is being read
+ cfg.getFilterChain().addLast( TCPBulkCommandFilter.NAME, new TCPBulkCommandFilter() );
+
+ //If we require message acks, then set up the filter
+ if (requireMessageAck){
+ messageAckCommandFilter = new TCPMessageAckCommandFilter(ackTimeout);
+ cfg.getFilterChain().addLast(TCPMessageAckCommandFilter.NAME, messageAckCommandFilter);
+ }
+
+ if (enableLogging){
+ cfg.getFilterChain().addLast( "logFilter", new LoggingFilter() );
+ }
acceptor.bind( inet, new TCPSocketHandler(info), cfg );
}
- public void stop() throws IOException {
+ public void stop() throws Exception {
+ if (messageAckCommandFilter != null){
+ try {
+ messageAckCommandFilter.destroy();
+ messageAckCommandFilter = null;
+ } catch (Exception e) {
+ log.error(e);
+ //Ignore since there is not much that can be done
+ }
+ }
if (acceptor != null){
acceptor.unbindAll();
+ acceptor = null;
}
}
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java?view=diff&rev=467819&r1=467818&r2=467819
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java Wed Oct 25 17:27:10 2006
@@ -39,7 +39,7 @@
public abstract class AbstractService {
protected String protocol = "tcp://localhost:45678";
- protected TransportService server = null;
+ protected TCPSocketTransportService server = null;
protected Socket client;
protected Socket client2;
protected CacheInfoHolder info;
@@ -53,6 +53,9 @@
URI uri = new URI(protocol);
server = new TCPSocketTransportService(protocol, info);
+ server.setAckTimeout(10000);
+ server.setRequireMessageAck(true);
+ server.setEnableLogging(true);
server.start();