You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/11/20 18:52:42 UTC
svn commit: r477271 - in /geronimo/sandbox/gcache:
common/src/main/java/org/apache/geronimo/gcache/
common/src/main/java/org/apache/geronimo/gcache/command/
common/src/main/java/org/apache/geronimo/gcache/transports/
common/src/main/java/org/apache/ger...
Author: jgenender
Date: Mon Nov 20 09:52:40 2006
New Revision: 477271
URL: http://svn.apache.org/viewvc?view=rev&rev=477271
Log:
Refactoring for shared code between client and server
Added:
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java (with props)
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java (with props)
Removed:
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/config/
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/core/
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/network/
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java
Modified:
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/command/Command.java
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/discovery/multicast/MulticastDiscoveryAgent.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheManager.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/server/GCacheManagerTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorClearCacheCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutEntryCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutSessionCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveEntryCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveSessionCommandTest.java
Modified: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java (original)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java Mon Nov 20 09:52:40 2006
@@ -25,12 +25,14 @@
import org.apache.geronimo.gcache.listeners.DefaultCacheNotifier;
import org.apache.geronimo.gcache.transports.EndpointManager;
import org.apache.geronimo.gcache.transports.DiscoveryManager;
+import org.apache.geronimo.gcache.transports.CommandVisitor;
public class CacheInfoHolder {
private final CacheManager cacheManager;
private CacheNotifier cacheNotifier = null;
private EndpointManager endpointManager;
private DiscoveryManager discoveryManager;
+ private CommandVisitor commandVisitor;
public CacheInfoHolder(CacheManager cacheManager) {
this.cacheManager = cacheManager;
@@ -61,6 +63,14 @@
public void setCacheNotifier(CacheNotifier cacheNotifier) {
this.cacheNotifier = cacheNotifier;
cacheNotifier.setInfo(this);
+ }
+
+ public CommandVisitor getCommandVisitor() {
+ return commandVisitor;
+ }
+
+ public void setCommandVisitor(CommandVisitor commandVisitor) {
+ this.commandVisitor = commandVisitor;
}
public Cache getCache(String cacheName, boolean create){
Modified: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/command/Command.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/command/Command.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/command/Command.java (original)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/command/Command.java Mon Nov 20 09:52:40 2006
@@ -33,4 +33,8 @@
public byte getCommandType() throws IOException;
public void execute(CommandVisitor visitor) throws Exception;
+
+ public void setAttachment(Object o);
+
+ public Object getAttachment();
}
Modified: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java (original)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java Mon Nov 20 09:52:40 2006
@@ -17,25 +17,24 @@
*/
package org.apache.geronimo.gcache.transports;
-import java.util.Collections;
-import java.util.HashSet;
+import org.apache.mina.common.IoSession;
+
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
public class EndpointManager {
CopyOnWriteArraySet endpoints = new CopyOnWriteArraySet();
- public void addEndpoint(Endpoint endpoint) {
+ public void addEndpoint(IoSession endpoint) {
endpoints.add(endpoint);
}
- public void removeEndpoint(Endpoint endpoint) {
+ public void removeEndpoint(IoSession endpoint) {
endpoints.remove(endpoint);
}
- public boolean contains(Endpoint endpoint) {
+ public boolean contains(IoSession endpoint) {
return endpoints.contains(endpoint);
}
@@ -43,7 +42,7 @@
return endpoints.size();
}
- public Set<Endpoint> getEndpoints() {
+ public Set<IoSession> getEndpoints() {
return endpoints;
}
Modified: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/discovery/multicast/MulticastDiscoveryAgent.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/discovery/multicast/MulticastDiscoveryAgent.java Mon Nov 20 09:52:40 2006
@@ -93,7 +93,7 @@
private int order = 0;
private final Executor executor = new ThreadPoolExecutor(1, 1, 30,
- TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable,
"Multicast Transport Service Notifier");
@@ -195,7 +195,6 @@
this.inetAddress = InetAddress.getByName(host);
this.sockAddress = new InetSocketAddress(this.inetAddress, port);
- InetSocketAddress inet = new InetSocketAddress(host, port);
mcast = new MulticastSocket(port);
mcast.setLoopbackMode(loopBackMode);
mcast.setTimeToLive(timeToLive);
@@ -282,8 +281,8 @@
private void doExpireOldServices() {
long expireTime = System.currentTimeMillis()
- (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
- for (Iterator i = services.entrySet().iterator(); i.hasNext();) {
- Map.Entry entry = (Map.Entry) i.next();
+ for (Object o : services.entrySet()) {
+ Map.Entry entry = (Map.Entry) o;
AtomicLong lastHeartBeat = (AtomicLong) entry.getValue();
if (lastHeartBeat.get() < expireTime) {
String nodeName = (String) nodes.get(entry.getKey());
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+public class Constants {
+ public final static byte MAGIC[] = new byte[]{'G', 'C', 'a', 'c', 'h', 'e'};
+ public final static int HEADER_SIZE = MAGIC.length + 1 + 4;
+
+ //Session attribute keys
+ public final static String AUTHENTICATED = "AUTHENTICATED";
+ public final static String AUTH_TASK = "AUTH_TASK";
+ public final static String BULK_COUNT = "BULK_COUNT_";
+ public final static String BULK_COMMAND_ID = "BULK_COMMAND_ID_";
+ public final static String MESSAGE_ACK_ID = "MESSAGE_ACK_ID_";
+ public final static String REMOTE_PUBLIC_KEY = "REMOTE_PUBLIC_KEY";
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,224 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.security.PublicKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.AuthCommand;
+import org.apache.geronimo.gcache.command.HandShakeCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.geronimo.gcache.command.PublicKeyCommand;
+import org.apache.geronimo.gcache.util.CipherUtil;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
+
+public class TCPAuthenticationFilter extends IoFilterAdapter {
+
+ private static final Log log = LogFactory
+ .getLog(TCPAuthenticationFilter.class);
+
+ public final static String NAME = "AuthenticationFilter";
+
+ private final String userId;
+
+ private final String password;
+
+ public TCPAuthenticationFilter(final String userId, final String password) {
+ this.userId = userId;
+ this.password = password;
+ }
+
+ @Override
+ public void sessionOpened(NextFilter nextFilter, IoSession sess)
+ throws Exception {
+
+ //Start up the response timeout
+ ScheduledFuture task = ((TCPSocketHandler) sess.getHandler()).schedule(
+ new TimeoutTask(sess), 5000);
+ sess.setAttribute(Constants.AUTH_TASK, task);
+
+ nextFilter.sessionCreated(sess);
+ }
+
+ /**
+ * @Override public void sessionCreated(NextFilter nextFilter, IoSession sess)
+ * throws Exception {
+ * //Start up the response timeout
+ * ScheduledFuture task = ((TCPSocketHandler) sess.getHandler()).schedule(
+ * new TimeoutTask(sess), 5000);
+ * sess.setAttribute(Constants.AUTH_TASK, task);
+ * nextFilter.sessionCreated(sess);
+ * }
+ */
+
+ @Override
+ public void messageReceived(NextFilter nextFilter, IoSession sess,
+ Object obj) throws Exception {
+
+ //If we have authenticated, continue on
+ if (sess.containsAttribute(Constants.AUTHENTICATED)) {
+ nextFilter.messageReceived(sess, obj);
+ return;
+ }
+
+ //Oh goody...someone might be authenticating
+
+ //If the session has a remote public key, then it's an Auth Command
+ if (sess.containsAttribute(Constants.REMOTE_PUBLIC_KEY)) {
+ handleAuth(sess, obj);
+ } else {
+ //No Remote Public Key, so this should be a handshake
+ handleHandShake(sess, obj);
+ }
+
+ //Consume the message (don't pass it on)
+ }
+
+ private void handleAuth(IoSession sess, Object obj) throws Exception {
+ if (!(obj instanceof AuthCommand)) {
+ //Nope...buh-bye...
+ log.error("Expected AuthCommand but got "
+ + obj.getClass().getSimpleName() + " from "
+ + sess.getRemoteAddress().toString());
+ sess.close();
+ return;
+ }
+
+ ScheduledFuture authTask = (ScheduledFuture) sess
+ .getAttribute(Constants.AUTH_TASK);
+
+ //Cancel the timer
+ if (!authTask.cancel(false)) {
+ //Can't cancel because it's ran or is running...too late!
+ return;
+ }
+
+ //Pull and test the credentials
+ AuthCommand auth = (AuthCommand) obj;
+ String authUserId = auth.getUserId();
+ if (log.isDebugEnabled()) {
+ log.debug("User Id read was '" + authUserId + "' from " + sess.getRemoteAddress().toString());
+ }
+ if (!userId.equals(authUserId)) {
+ log.error("Authentication failure for "
+ + sess.getRemoteAddress().toString());
+ sess.close();
+ return;
+ }
+
+ String authPassword = auth.getPassword();
+ if (log.isDebugEnabled()) {
+ log.debug("Password read was '" + authPassword + "' from " + sess.getRemoteAddress().toString());
+ }
+ if (!password.equals(authPassword)) {
+ log.error("Authentication failure for "
+ + sess.getRemoteAddress().toString());
+ sess.close();
+ return;
+ }
+
+ //If we got here then authentication passed
+
+ //Clear the AUTH_TASK
+ sess.removeAttribute(Constants.AUTH_TASK);
+
+ //Register authentication
+ sess.setAttribute(Constants.AUTHENTICATED);
+
+ //Now add the client to the cache to start receiving events
+ CacheInfoHolder infoHolder = ((TCPSocketHandler) sess.getHandler())
+ .getInfoHolder();
+ infoHolder.getEndpointManager().addEndpoint(sess);
+
+ //See if we need to send an Ack
+ if (sess.getFilterChain().contains(TCPMessageAckCommandFilter.NAME)){
+ log.debug("Sending MessageAck");
+ MessageAckCommand ack = new MessageAckCommand();
+ ack.setMessageId(auth.getCommandId());
+ sess.write(ack);
+ }
+
+ }
+
+ private void handleHandShake(IoSession sess, Object obj) throws Exception {
+ ScheduledFuture handshakeTask = (ScheduledFuture) sess
+ .getAttribute(Constants.AUTH_TASK);
+
+ if (!(obj instanceof HandShakeCommand)) {
+ log.error("Expected HandShakeCommand but got "
+ + obj.getClass().getSimpleName() + " from "
+ + sess.getRemoteAddress().toString());
+ //Nope...buh-bye...
+ sess.close();
+ return;
+ }
+
+ //Cancel the timer
+ if (!handshakeTask.cancel(false)) {
+ //Can't cancel because it's ran or is running...too late!
+ return;
+ }
+
+ HandShakeCommand handShake = (HandShakeCommand) obj;
+ PublicKey key = handShake.getPublicKey();
+ if (key == null) {
+ throw new IllegalArgumentException(
+ "Handshake did not contain a key");
+ }
+
+ //Store the remote's public key
+ sess.setAttribute(Constants.REMOTE_PUBLIC_KEY, key);
+
+ //Send out our public key
+ PublicKeyCommand keyCommand = new PublicKeyCommand();
+ keyCommand.setPublicKey(CipherUtil.publicKey);
+
+ sess.write(keyCommand);
+
+ //Now schedule a timeout for authorization
+ ScheduledFuture authTask = ((TCPSocketHandler) sess.getHandler())
+ .schedule(new TimeoutTask(sess), 5000);
+ sess.setAttribute(Constants.AUTH_TASK, authTask);
+
+ }
+
+ class TimeoutTask implements Runnable {
+
+ private IoSession sess;
+
+ public TimeoutTask(IoSession sess) {
+ this.sess = sess;
+ }
+
+ public void run() {
+ log
+ .error("Timeout waiting for Handshake or Login from "
+ + sess.getRemoteAddress().toString()
+ + ", removing client.");
+ //Close the session, its no good since it cannot authenticate
+ sess.close();
+ }
+
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPAuthenticationFilter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,77 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+public class TCPBulkCommandFilter extends IoFilterAdapter {
+
+ public final static Log log = LogFactory.getLog(TCPBulkCommandFilter.class);
+
+ public static final String NAME = "BulkCommandFilter";
+
+ @Override
+ public void messageReceived(NextFilter nextFilter, IoSession sess,
+ Object obj) throws Exception {
+
+ BaseCommand command = (BaseCommand) obj;
+
+ String BULK_COUNT = Constants.BULK_COUNT + command.getCommandId();
+ String BULK_COMMAND_ID = Constants.BULK_COMMAND_ID
+ + command.getCommandId();
+
+ //Check to see if we are processing a bulk command
+ if (!sess.containsAttribute(Constants.BULK_COMMAND_ID + command.getCommandId())) {
+ //Not a bulk command, so process the command
+ nextFilter.messageReceived(sess, obj);
+ return;
+ }
+
+ Integer bulkCount = (Integer) sess.getAttribute(BULK_COUNT);
+
+ //If the bulk command hit zero, remove the BULK attributes
+ if (bulkCount == 0) {
+
+ //Remove the attributes
+ sess.removeAttribute(BULK_COUNT);
+ sess.removeAttribute(BULK_COMMAND_ID);
+
+ //Send the ack command back if required
+ if (sess.getFilterChain().get(TCPMessageAckCommandFilter.NAME) != null){
+ MessageAckCommand ack = new MessageAckCommand();
+ ack.setCommandId(command.getCommandId());
+ //Send the Ack
+ sess.write(ack);
+ }
+
+ } else {
+ //Decrease the command count
+ bulkCount -= 1;
+ sess.setAttribute(BULK_COUNT, bulkCount);
+ }
+
+ //Process the command
+ nextFilter.messageReceived(sess, obj);
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPBulkCommandFilter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
+
+public class TCPCommandProtocolCodecFactory extends DemuxingProtocolCodecFactory {
+
+ public TCPCommandProtocolCodecFactory() {
+ super.register(TCPCommandRequestDecoder.class);
+ super.register(TCPCommandRequestEncoder.class);
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,153 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.demux.MessageDecoderAdapter;
+import org.apache.mina.filter.codec.demux.MessageDecoderResult;
+
+public class TCPCommandRequestDecoder extends MessageDecoderAdapter {
+
+ private static Log log = LogFactory.getLog(TCPCommandRequestDecoder.class);
+
+
+ public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
+ // Return NEED_DATA if the whole header is not read yet.
+ try {
+ return messageComplete(in);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+
+ return MessageDecoderResult.NOT_OK;
+ }
+
+ public MessageDecoderResult decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ // TODO Auto-generated method stub
+
+ byte magic[] = new byte[Constants.MAGIC.length];
+ in.get(magic);
+
+ // Get the command
+ byte commandIdentifier = in.get();
+
+ // Get the command length
+ in.getInt();
+
+ Command command = CommandTypes.createCommand(commandIdentifier);
+ if (log.isDebugEnabled()) {
+ log.debug("Command is type " + command.getClass().getSimpleName() + " from " + session.getRemoteAddress().toString());
+ }
+
+ command.readExternal(in);
+
+ out.write(command);
+
+ return MessageDecoderResult.OK;
+ }
+
+ private MessageDecoderResult messageComplete(ByteBuffer in)
+ throws Exception {
+
+ // HEADER BREAKDOWN
+ // ---------------------------------------------------------
+ // MAGIC HEADER - 6 bytes
+ // COMMAND - 1 byte
+ // COMMAND LENGTH - 4 bytes
+ // COMMAND - ? bytes as defined above
+
+ if (in.remaining() < Constants.HEADER_SIZE)
+ return MessageDecoderResult.NEED_DATA;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Header read:");
+ // log.debug(HexDump.hexString(buffer.array()));
+ }
+
+ // Read the magic
+ byte magic[] = new byte[Constants.MAGIC.length];
+ in.get(magic);
+
+ // Better match the Magic
+ if (!Arrays.equals(Constants.MAGIC, magic)) {
+ // Magic doesn't match, so close the socket
+ log.debug("Magic did not match!");
+ // infoHolder.getEndpointManager().removeEndpoint( new
+ // TCPEndpoint(channel));
+
+ // disconnect(channel);
+ // return;
+ }
+
+ log.debug("Magic found");
+
+ // Get the command
+ byte commandIdentifier = in.get();
+ if (log.isDebugEnabled()) {
+ log.debug("Command Identifier = " + commandIdentifier);
+ }
+
+ // Get the command length
+ int length = in.getInt();
+ if (log.isDebugEnabled()) {
+ log.debug("Command length = " + length);
+ }
+
+ // Be sure we have all of the data we need
+ if (in.remaining() != length) {
+ return MessageDecoderResult.NEED_DATA;
+ }
+
+ /**
+ * // TODO - This really should be done with a ByteBuffer pool
+ * ByteBuffer commandBuffer = ByteBuffer.allocate(length);
+ * bcr.reset(channel, commandBuffer); count = bcr.readBuffer(length); if
+ * (count < 0) { log.debug("Client disconnected...removing endpoint."); //
+ * Remove the endpoint from the list of clients
+ * infoHolder.getEndpointManager().removeEndpoint( new
+ * TCPEndpoint(channel)); channel.close(); return; } if (count < length) { //
+ * Command was bad if (log.isDebugEnabled()) { log.debug("Command read
+ * size (" + count + ") did not equal expected length (" + length +
+ * ")"); } // TODO - Send back a resend? return; }
+ *
+ * byte commandArray[] = commandBuffer.array();
+ *
+ * if (log.isDebugEnabled()) { log.debug("Command read:");
+ * log.debug(HexDump.hexString(commandArray)); }
+ *
+ * ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer
+ * .array()); ReadableByteChannel readChannel = Channels.newChannel(bias); // Create the command and unmarshal the
+ * data Command command = CommandTypes.createCommand(commandIdentifier);
+ * if (log.isDebugEnabled()) { log.debug("Command is a " +
+ * command.getClass().getSimpleName()); }
+ * command.readExternal(readChannel);
+ */
+
+ return MessageDecoderResult.OK;
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.AuthCommand;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.BulkSendCommand;
+import org.apache.geronimo.gcache.command.ClearCacheCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.GetCacheCommand;
+import org.apache.geronimo.gcache.command.HandShakeCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.geronimo.gcache.command.PublicKeyCommand;
+import org.apache.geronimo.gcache.command.PutEntryCommand;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.apache.geronimo.gcache.command.RemoveEntryCommand;
+import org.apache.geronimo.gcache.command.RemoveSessionCommand;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+
+public class TCPCommandRequestEncoder implements MessageEncoder {
+
+ private static Log log = LogFactory.getLog(TCPCommandRequestEncoder.class);
+
+ private static final Set TYPES;
+
+ static
+ {
+ Set<Class> types= new HashSet<Class> ();
+ types.add( AuthCommand.class );
+ types.add( BulkSendCommand.class );
+ types.add( ClearCacheCommand.class );
+ types.add( GetCacheCommand.class );
+ types.add( HandShakeCommand.class );
+ types.add( MessageAckCommand.class );
+ types.add( PublicKeyCommand.class );
+ types.add( PutEntryCommand.class );
+ types.add( PutSessionCommand.class );
+ types.add( RemoveEntryCommand.class );
+ types.add( RemoveSessionCommand.class );
+ TYPES = Collections.unmodifiableSet( types );
+ }
+
+ public void encode(IoSession sess, Object obj, ProtocolEncoderOutput out) throws Exception {
+
+ if (log.isDebugEnabled()){
+ log.debug("Sending " + obj.getClass().getSimpleName() + " to " + sess.getRemoteAddress().toString());
+ }
+ Command command = (Command)obj;
+ out.write(ByteBuffer.wrap(((BaseCommand)command).createPacket(true)));
+ }
+
+ public Set getMessageTypes() {
+ // TODO Auto-generated method stub
+ return TYPES;
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
+
+public class TCPMessageAckCommandFilter extends IoFilterAdapter {
+
+ private static final Log log = LogFactory
+ .getLog(TCPMessageAckCommandFilter.class);
+
+ public final static String NAME = "MessageAckCommandFilter";
+
+ private long ackTimeout;
+
+ public TCPMessageAckCommandFilter(long ackTimeout) {
+ if (log.isDebugEnabled())
+ log.debug("TCPMessageAckFilter created with a timeout of " + ackTimeout);
+ this.ackTimeout = ackTimeout;
+ }
+
+ @Override
+ public void messageReceived(NextFilter nextFilter, IoSession sess,
+ Object obj) throws Exception {
+
+ //Look for only MessageAckCommands
+ if (obj instanceof MessageAckCommand) {
+
+ MessageAckCommand command = (MessageAckCommand) obj;
+ Long commandId = command.getMessageId();
+
+ ScheduledFuture handle = (ScheduledFuture) sess.getAttribute(Constants.MESSAGE_ACK_ID + commandId);
+ if (handle != null) {
+ //Cancel the timer
+ if (!handle.cancel(false)){
+ //Too late, it's firing...this connection is done
+ return;
+ }
+
+ }
+
+ log.debug("MessageAck received.");
+
+ //Consume the message
+ //return;
+ }
+
+ //Process the command
+ nextFilter.messageReceived(sess, obj);
+ }
+
+ public void requestAck(long commandId, IoSession sess) {
+ AckTask task = new AckTask(commandId, sess);
+ TCPSocketHandler handler = (TCPSocketHandler) sess.getHandler();
+ ScheduledFuture handle = handler.schedule(task, ackTimeout);
+ sess.setAttribute(Constants.MESSAGE_ACK_ID + commandId, handle);
+ }
+
+ class AckTask implements Runnable {
+
+ private long commandId;
+
+ private IoSession sess;
+
+ public AckTask(long commandId, IoSession sess) {
+ this.commandId = commandId;
+ this.sess = sess;
+ }
+
+ public void run() {
+ log.error("Timeout waiting for Message ack for commandid="
+ + commandId + ", removing client.");
+ //Close the session, its no good since it cannot ack the message
+ sess.close();
+ }
+
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPMessageAckCommandFilter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,100 @@
+/*
+ *
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.SessionLog;
+
+import edu.emory.mathcs.backport.java.util.concurrent.Executors;
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledFuture;
+import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+
+/**
+ * This class is the main worker of the gcache functionality. Essentialy this
+ */
+public class TCPSocketHandler extends IoHandlerAdapter {
+
+ public final static int DEFAULT_THREAD_POOL_SIZE = 10;
+ private final CacheInfoHolder infoHolder;
+ private final ScheduledThreadPoolExecutor scheduler;
+
+ public TCPSocketHandler(final CacheInfoHolder infoHolder) {
+ this(infoHolder, DEFAULT_THREAD_POOL_SIZE);
+ }
+
+ public TCPSocketHandler(final CacheInfoHolder infoHolder, int threadPoolSize) {
+ this.infoHolder = infoHolder;
+ scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize);
+
+ //Install the schedule purger to purge any cancelled tasks
+ //to prevent memory leaks
+ scheduler.scheduleWithFixedDelay(new SchedulePurger(), 10000, 10000, TimeUnit.MILLISECONDS);
+ }
+
+ public CacheInfoHolder getInfoHolder() {
+ return infoHolder;
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object obj) throws Exception {
+
+ Command command = (Command) obj;
+
+ command.setAttachment(session);
+ command.execute(infoHolder.getCommandVisitor());
+
+ }
+
+ @Override
+ public void exceptionCaught(IoSession sess, Throwable cause) throws Exception {
+ SessionLog.error(sess, "", cause);
+ infoHolder.getEndpointManager().removeEndpoint(sess);
+ sess.close();
+ }
+
+
+ @Override
+ public void sessionClosed(IoSession sess) throws Exception {
+ //Remove the client from the list
+ infoHolder.getEndpointManager().removeEndpoint(sess);
+ }
+
+ public void destroy() {
+ //Shut down the scheduler.
+ scheduler.shutdownNow();
+ }
+
+ public ScheduledFuture schedule(Runnable task, long delay) {
+ return scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Task to remove cancelled tasks from the scheduler
+ */
+ class SchedulePurger implements Runnable {
+
+ public void run() {
+ scheduler.purge();
+ }
+
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,171 @@
+/*
+ *
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.InetAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.transports.TransportService;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.filter.LoggingFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
+
+public class TCPSocketTransportService extends TransportService {
+
+ private final static Log log = LogFactory.getLog(TCPSocketTransportService.class);
+
+ public static final int DEFAULT_PORT = 4123;
+ public static final long DEFAULT_ACK_TIMEOUT = 10000;
+ public final static String DEFAULT_USERNAME = "system";
+ public final static String DEFAULT_PASSWORD = "manager";
+
+ IoAcceptor acceptor = null;
+ private URI tcpURI;
+ private boolean requireMessageAck = false;
+ private boolean enableLogging = false;
+ private long ackTimeout = DEFAULT_ACK_TIMEOUT;
+ private TCPSocketHandler handler = null;
+ private String userId = DEFAULT_USERNAME;
+ private String password = DEFAULT_PASSWORD;
+
+ public String getService() throws Exception{
+ if (tcpURI == null){
+ throw new Exception("TCP URI has not been set.");
+ }
+
+ String host = tcpURI.getHost();
+ if (host.equals("0.0.0.0")){
+ host = InetAddress.getLocalHost().getHostAddress().toString();
+ }
+ return tcpURI.getScheme() + "://" + host + ":" + tcpURI.getPort();
+ }
+
+ public URI getTcpURI() {
+ return tcpURI;
+ }
+
+ public void setTcpURI(URI tcpURI) throws Exception{
+ int port = tcpURI.getPort();
+ if (port == -1){
+ port = DEFAULT_PORT;
+ }
+ if (tcpURI.getHost().equals("default")){
+ String query = tcpURI.getQuery();
+ String uri = tcpURI.getScheme() + "://" + "0.0.0.0" + ":" + port + (query == null ? "" : "?" + query);
+ this.tcpURI = new URI(uri);
+ } else {
+ this.tcpURI = tcpURI;
+ }
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getUserId() {
+ return userId;
+ }
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public boolean isRequireMessageAck() {
+ return requireMessageAck;
+ }
+
+ public void setRequireMessageAck(boolean requireMessageAck) {
+ this.requireMessageAck = requireMessageAck;
+ }
+
+ public long getAckTimeout() {
+ return ackTimeout;
+ }
+
+ public void setAckTimeout(long ackTimeout) {
+ this.ackTimeout = ackTimeout;
+ }
+
+ public boolean isEnableLogging() {
+ return enableLogging;
+ }
+
+ public void setEnableLogging(boolean enableLogging) {
+ this.enableLogging = enableLogging;
+ }
+
+ public void start() throws Exception {
+ int port = tcpURI.getPort();
+ String host = tcpURI.getHost();
+
+ InetSocketAddress inet = new InetSocketAddress(host, port);
+
+ acceptor = new SocketAcceptor();
+ SocketAcceptorConfig cfg = new SocketAcceptorConfig();
+ cfg.setReuseAddress(true);
+
+ //If we requested logging, add that too
+ if (enableLogging) {
+ cfg.getFilterChain().addLast("logFilter", new LoggingFilter());
+ }
+
+ //Add the filter to hande the GCache Codec
+ cfg.getFilterChain().addLast("protocolFilter", new ProtocolCodecFilter(new TCPCommandProtocolCodecFactory()));
+
+ //Add the authentication filter
+ cfg.getFilterChain().addLast(TCPAuthenticationFilter.NAME, new TCPAuthenticationFilter(userId, password));
+
+ //Add the BulkCommandFilter for filtering when a BulkCommand is being read
+ cfg.getFilterChain().addLast(TCPBulkCommandFilter.NAME, new TCPBulkCommandFilter());
+
+ //If we require message acks, then set up the message ack filter
+ if (requireMessageAck) {
+ cfg.getFilterChain().addLast(TCPMessageAckCommandFilter.NAME, new TCPMessageAckCommandFilter(ackTimeout));
+ }
+
+
+ handler = new TCPSocketHandler(info);
+ acceptor.bind(inet, handler, cfg);
+ }
+
+ public void stop() throws Exception {
+
+ if (handler != null) {
+ try {
+ handler.destroy();
+ handler = null;
+ } catch (Exception e) {
+ log.error(e);
+ //Ignore since there is not much that can be done
+ }
+ }
+ if (acceptor != null) {
+ acceptor.unbindAll();
+ acceptor = null;
+ }
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportService.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java?view=auto&rev=477271
==============================================================================
--- geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java (added)
+++ geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java Mon Nov 20 09:52:40 2006
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.transports.TransportFactory;
+import org.apache.geronimo.gcache.transports.TransportService;
+import org.apache.geronimo.gcache.util.IOExceptionSupport;
+import org.apache.geronimo.gcache.util.IntrospectionSupport;
+import org.apache.geronimo.gcache.util.URISupport;
+
+public class TCPSocketTransportServiceFactory extends TransportFactory {
+
+ @Override
+ protected TransportService doCreateTransportService(URI uri, CacheInfoHolder info)
+ throws IOException {
+ try {
+
+ Map options = URISupport.parseParamters(uri);
+ TCPSocketTransportService rc = new TCPSocketTransportService();
+
+ rc.setInfo(info);
+ rc.setTcpURI(uri);
+ IntrospectionSupport.setProperties(rc, options);
+ return rc;
+
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create tcp transport service: " + uri, e);
+ }
+ }
+
+}
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/common/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServiceFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheManager.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheManager.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheManager.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheManager.java Mon Nov 20 09:52:40 2006
@@ -18,18 +18,22 @@
*/
package org.apache.geronimo.gcache.server;
-import org.apache.commons.cli.*;
-import org.apache.geronimo.gcache.dd.ConfigBuilder;
-import org.apache.geronimo.gcache.dd.GcacheConfiguration;
-import org.apache.geronimo.gcache.CacheInfoHolder;
-import org.apache.geronimo.gcache.transports.tcp.TCPCacheNotifier;
-import org.apache.geronimo.gcache.transports.network.ConnectionManager;
-
import java.io.File;
-import java.net.URI;
import net.sf.ehcache.CacheManager;
+import org.apache.commons.cli.BasicParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.dd.ConfigBuilder;
+import org.apache.geronimo.gcache.dd.GcacheConfiguration;
+import org.apache.geronimo.gcache.transports.network.ConnectionManager;
+import org.apache.geronimo.gcache.transports.tcp.TCPCacheNotifier;
+import org.apache.geronimo.gcache.transports.tcp.TCPCommandVisitor;
+
public class GCacheManager {
private static final String OPT_CONFIG = "c";
@@ -50,6 +54,8 @@
//Create a cache with the configuration from the gcache-config file
cacheMgr = new CacheManager(config.getEhcacheConfig());
info = new CacheInfoHolder(cacheMgr);
+ info.setCommandVisitor(new TCPCommandVisitor(info));
+ info.setCacheNotifier(new TCPCacheNotifier());
connectionManager = new ConnectionManager(info);
connectionManager.setTransportURI(config.getTransportUri());
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCacheNotifier.java Mon Nov 20 09:52:40 2006
@@ -28,7 +28,6 @@
import org.apache.geronimo.gcache.command.RemoveEntryCommand;
import org.apache.geronimo.gcache.command.RemoveSessionCommand;
import org.apache.geronimo.gcache.listeners.CacheNotifier;
-import org.apache.geronimo.gcache.transports.Endpoint;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
@@ -68,31 +67,26 @@
return;
}
- TCPEndpoint origEndpoint = (TCPEndpoint) command.getAttachment();
+ IoSession origEndpoint = (IoSession) command.getAttachment();
//Spin through the client list
- Set<Endpoint> set = info.getEndpointManager().getEndpoints();
- synchronized (set) {
- for (Endpoint endpoint : set) {
- TCPEndpoint tcp = (TCPEndpoint) endpoint;
-
- //Don't update from whence it came
- if (origEndpoint != null && origEndpoint.equals(tcp)) {
- continue;
- }
-
- IoSession sess = tcp.getIoSession();
- sess.write(buffer);
-
- //See if we need to request an Ack
- TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) sess
- .getFilterChain().get(TCPMessageAckCommandFilter.NAME);
- if (filter != null) {
- long commandId = command.getCommandId();
- filter.requestAck(commandId, sess);
- }
+ Set<IoSession> set = info.getEndpointManager().getEndpoints();
+ for (IoSession endpoint : set) {
+ //Don't update from whence it came
+ if (origEndpoint != null && origEndpoint.equals(endpoint)) {
+ continue;
}
+
+ endpoint.write(buffer);
+
+ //See if we need to request an Ack
+ TCPMessageAckCommandFilter filter = (TCPMessageAckCommandFilter) endpoint.getFilterChain().get(TCPMessageAckCommandFilter.NAME);
+ if (filter != null) {
+ long commandId = command.getCommandId();
+ filter.requestAck(commandId, endpoint);
+ }
+
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Mon Nov 20 09:52:40 2006
@@ -47,13 +47,7 @@
private CacheInfoHolder infoHolder;
- private TCPEndpoint endpoint;
-
- private IoSession sess;
-
- public TCPCommandVisitor(CacheInfoHolder infoHolder, IoSession sess) {
- this.sess = sess;
- endpoint = new TCPEndpoint(sess);
+ public TCPCommandVisitor(CacheInfoHolder infoHolder) {
this.infoHolder = infoHolder;
}
@@ -66,7 +60,6 @@
cache.remove(command.getSessionId());
}
- command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyRemoveSession(command);
sendAck(command);
}
@@ -100,7 +93,6 @@
cache.remove(command.getHashableKey());
}
- command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyRemove(command);
sendAck(command);
@@ -114,7 +106,6 @@
cache.put(new Element(command.getSessionId(), command
.getRawSessionFromPayload()));
- command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyPutSession(command);
sendAck(command);
@@ -155,7 +146,6 @@
.getRawPayload()));
}
- command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyPut(command);
sendAck(command);
}
@@ -168,7 +158,7 @@
public void processGetCache(GetCacheCommand command) {
Cache cache = infoHolder.getCache(command.getCacheName(), true);
- IoSession sess = endpoint.getIoSession();
+ IoSession sess = (IoSession)command.getAttachment();
//Send a bulk command
BulkSendCommand bulk = new BulkSendCommand();
@@ -224,7 +214,6 @@
Cache cache = infoHolder.getCache(command.getCacheName(), true);
cache.removeAll();
- command.setAttachment(endpoint);
infoHolder.getCacheNotifier().notifyClearCache(command);
sendAck(command);
@@ -235,6 +224,7 @@
//Get the command count and set the attribute to count em down
int commandCount = command.getNumberOfCommands();
+ IoSession sess = (IoSession)command.getAttachment();
if (sess == null)
return;
@@ -249,6 +239,7 @@
private void sendAck(BaseCommand command) {
+ IoSession sess = (IoSession)command.getAttachment();
if (sess == null)
return;
@@ -261,6 +252,7 @@
private void requestAck(BaseCommand command) {
+ IoSession sess = (IoSession)command.getAttachment();
if (sess == null)
return;
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/server/GCacheManagerTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/server/GCacheManagerTest.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/server/GCacheManagerTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/server/GCacheManagerTest.java Mon Nov 20 09:52:40 2006
@@ -18,14 +18,6 @@
*/
package org.apache.geronimo.gcache.server;
-import org.apache.geronimo.gcache.command.*;
-import org.apache.geronimo.gcache.transports.tcp.Constants;
-import org.apache.geronimo.gcache.util.CipherUtil;
-import org.apache.mina.common.ByteBuffer;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
@@ -33,6 +25,20 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+
+import org.apache.geronimo.gcache.command.AuthCommand;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.geronimo.gcache.command.HandShakeCommand;
+import org.apache.geronimo.gcache.command.PublicKeyCommand;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.apache.geronimo.gcache.transports.tcp.Constants;
+import org.apache.geronimo.gcache.util.CipherUtil;
+import org.apache.mina.common.ByteBuffer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
/**
* Test the GCachManager (the server) - no MessageAcks
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractService.java Mon Nov 20 09:52:40 2006
@@ -56,6 +56,9 @@
info = new CacheInfoHolder(mgr);
URI uri = new URI(protocol);
+
+ info.setCommandVisitor(new TCPCommandVisitor(info));
+ info.setCacheNotifier(new TCPCacheNotifier());
server = (TCPSocketTransportService)TCPSocketTransportServiceFactory.createTransportService(uri, info);
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java Mon Nov 20 09:52:40 2006
@@ -22,48 +22,48 @@
import org.apache.geronimo.gcache.command.BulkSendCommand;
import org.apache.geronimo.gcache.command.Command;
import org.apache.geronimo.gcache.command.GetCacheCommand;
-import org.apache.geronimo.gcache.transports.Endpoint;
+import org.apache.mina.common.IoSession;
import org.testng.annotations.Test;
public class TCPEndpointTest extends AbstractService {
-
+
@Test
- public void testJoinCluster() throws Exception{
-
+ public void testJoinCluster() throws Exception {
+
//Should be 2 client end points
assert 2 == info.getEndpointManager().size();
- GetCacheCommand command = new GetCacheCommand();
-
- command.setCacheName("Cache1");
-
+ GetCacheCommand command = new GetCacheCommand();
+
+ command.setCacheName("Cache1");
+
//Send the packet
- sendCommand(client, command);
-
+ sendCommand(client, command);
+
//Now receive any data (it Should be a BulkSendCommand)
- Command bulk = this.readCommand(client);
+ Command bulk = this.readCommand(client);
//Is the message the type we think it is?
assert bulk instanceof BulkSendCommand;
- int commandsToFollow = ((BulkSendCommand)bulk).getNumberOfCommands();
-
+ int commandsToFollow = ((BulkSendCommand) bulk).getNumberOfCommands();
+
//Nothing is in the Cache, so no commands should follow
- assert commandsToFollow == 0;
-
+ assert commandsToFollow == 0;
+
client.close();
-
+
//Give the server some time to figure out the client closed the connection
Thread.sleep(1000);
-
+
//The endpoint should have been removed
- Set<Endpoint>set = info.getEndpointManager().getEndpoints();
+ Set<IoSession> set = info.getEndpointManager().getEndpoints();
assert set.size() == 1;
-
+
client2.close();
-
+
//Give the server some time to figure out the client closed the connection
Thread.sleep(1000);
-
+
set = info.getEndpointManager().getEndpoints();
assert set.size() == 0;
}
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorClearCacheCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorClearCacheCommandTest.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorClearCacheCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorClearCacheCommandTest.java Mon Nov 20 09:52:40 2006
@@ -61,7 +61,7 @@
command.setCacheName(cacheName1);
CacheInfoHolder info = new CacheInfoHolder(cacheMgr);
- TCPCommandVisitor visitor = new TCPCommandVisitor(info, null);
+ TCPCommandVisitor visitor = new TCPCommandVisitor(info);
command.execute(visitor);
//Cache1 should be empty
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutEntryCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutEntryCommandTest.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutEntryCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutEntryCommandTest.java Mon Nov 20 09:52:40 2006
@@ -50,7 +50,7 @@
command.setPayload(data);
CacheInfoHolder info = new CacheInfoHolder(cacheMgr);
- TCPCommandVisitor visitor = new TCPCommandVisitor(info, null);
+ TCPCommandVisitor visitor = new TCPCommandVisitor(info);
command.execute(visitor);
Cache cache = cacheMgr.getCache(cacheName);
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutSessionCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutSessionCommandTest.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutSessionCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorPutSessionCommandTest.java Mon Nov 20 09:52:40 2006
@@ -84,7 +84,7 @@
assert cache.getSize() == 0;
//Execute the command and it should repopulate the cache
- TCPCommandVisitor visitor = new TCPCommandVisitor(info, null);
+ TCPCommandVisitor visitor = new TCPCommandVisitor(info);
command.execute(visitor);
//Test the cache it repoulated
@@ -112,7 +112,7 @@
command.setSessionId(sessionId);
command.setPayloadFromSession(sessionMap);
- TCPCommandVisitor visitor = new TCPCommandVisitor(info, null);
+ TCPCommandVisitor visitor = new TCPCommandVisitor(info);
command.execute(visitor);
//Test the cache got populated
@@ -157,7 +157,7 @@
command.setSessionId(sessionId);
command.setPayload(data);
- TCPCommandVisitor visitor = new TCPCommandVisitor(info, null);
+ TCPCommandVisitor visitor = new TCPCommandVisitor(info);
command.execute(visitor);
}
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveEntryCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveEntryCommandTest.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveEntryCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveEntryCommandTest.java Mon Nov 20 09:52:40 2006
@@ -64,7 +64,7 @@
//Execute the command
CacheInfoHolder info = new CacheInfoHolder(cacheMgr);
- TCPCommandVisitor visitor = new TCPCommandVisitor(info, null);
+ TCPCommandVisitor visitor = new TCPCommandVisitor(info);
command.execute(visitor);
//Check the cache
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveSessionCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveSessionCommandTest.java?view=diff&rev=477271&r1=477270&r2=477271
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveSessionCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/VisitorRemoveSessionCommandTest.java Mon Nov 20 09:52:40 2006
@@ -64,7 +64,7 @@
//Execute the command
CacheInfoHolder info = new CacheInfoHolder(cacheMgr);
- TCPCommandVisitor visitor = new TCPCommandVisitor(info, null);
+ TCPCommandVisitor visitor = new TCPCommandVisitor(info);
command.execute(visitor);
//Check the cache