You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/10 01:51:04 UTC

svn commit: r454556 - in /geronimo/sandbox/gcache/server/src: main/java/org/apache/geronimo/gcache/ main/java/org/apache/geronimo/gcache/transports/ main/java/org/apache/geronimo/gcache/transports/tcp/ test/java/org/apache/geronimo/gcache/transports/tcp/

Author: jgenender
Date: Mon Oct  9 16:51:03 2006
New Revision: 454556

URL: http://svn.apache.org/viewvc?view=rev&rev=454556
Log:
Add the Endpoint processing

Added:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java   (with props)
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java   (with props)
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java   (with props)
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java   (with props)
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java   (with props)
Modified:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java Mon Oct  9 16:51:03 2006
@@ -18,17 +18,25 @@
  */
 package org.apache.geronimo.gcache;
 
-import net.sf.ehcache.CacheManager;
 import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+
 import org.apache.geronimo.gcache.server.listeners.CacheNotifier;
 import org.apache.geronimo.gcache.server.listeners.DefaultCacheNotifier;
+import org.apache.geronimo.gcache.transports.EndpointManager;
 
 public class CacheInfoHolder {
     private final CacheManager cacheManager;
     private CacheNotifier cacheNotifier = null;
+    private EndpointManager endpointManager;
 
     public CacheInfoHolder(CacheManager cacheManager) {
         this.cacheManager = cacheManager;
+        this.endpointManager = new EndpointManager();
+    }
+
+    public EndpointManager getEndpointManager() {
+        return endpointManager;
     }
 
     public CacheManager getCacheManager() {

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java Mon Oct  9 16:51:03 2006
@@ -0,0 +1,22 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports;
+
+public interface Endpoint {
+
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java Mon Oct  9 16:51:03 2006
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class EndpointManager {
+    
+    Set<Endpoint> endpoints = Collections.synchronizedSet(new HashSet<Endpoint>());
+    
+    public void addEndpoint(Endpoint endpoint){
+	endpoints.add(endpoint);
+    }
+    
+    public void removeEndpoint(Endpoint endpoint){
+	endpoints.remove(endpoint);
+    }
+    
+    public boolean contains(Endpoint endpoint){
+	return endpoints.contains(endpoint);
+    }
+    
+    public int size(){
+	return endpoints.size();
+    }
+
+    public Set<Endpoint> getEndpoints() {
+        return endpoints;
+    }
+    
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java Mon Oct  9 16:51:03 2006
@@ -94,8 +94,9 @@
         bcr.reset(channel, commandBuffer);
         count = bcr.readBuffer(length);
         if (count < 0) {
+            //Remove the endpoint from the list of clients
+            infoHolder.getEndpointManager().removeEndpoint(new TCPEndpoint(channel));
             channel.close();
-            //TODO - remove the peer as the socket was closed
             return;
         }
         if (count < length) {

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Mon Oct  9 16:51:03 2006
@@ -18,11 +18,22 @@
  */
 package org.apache.geronimo.gcache.transports.tcp;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.Element;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.BaseCommand;
 import org.apache.geronimo.gcache.command.BulkSendCommand;
 import org.apache.geronimo.gcache.command.ClearCacheCommand;
 import org.apache.geronimo.gcache.command.GetCacheCommand;
@@ -34,149 +45,195 @@
 import org.apache.geronimo.gcache.transports.CommandVisitor;
 import org.apache.geronimo.gcache.util.BufferChannelWriter;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 public class TCPCommandVisitor implements CommandVisitor {
     Log log = LogFactory.getLog(TCPCommandVisitor.class);
 
     private CacheInfoHolder infoHolder;
+
     private SelectionKey key;
 
     public TCPCommandVisitor(CacheInfoHolder infoHolder, SelectionKey key) {
-        this.key = key;
-        this.infoHolder = infoHolder;
+	this.key = key;
+	this.infoHolder = infoHolder;
     }
 
     public void processRemoveSession(RemoveSessionCommand command) {
 
-        Cache cache = infoHolder.getCache(command.getCacheName(), true);
+	Cache cache = infoHolder.getCache(command.getCacheName(), true);
 
-        //Be sure a session was sent
-        if (command.hasSession()) {
-            cache.remove(command.getSessionId());
-        }
+	// Be sure a session was sent
+	if (command.hasSession()) {
+	    cache.remove(command.getSessionId());
+	}
 
-        //Notify peers
-        infoHolder.getCacheNotifier().notifyRemoveSession(command);
+	// Notify peers
+	infoHolder.getCacheNotifier().notifyRemoveSession(command);
     }
 
+    @SuppressWarnings("unchecked")
     public void processRemoveEntry(RemoveEntryCommand command) {
 
+	Cache cache = infoHolder.getCache(command.getCacheName(), true);
 
-        Cache cache = infoHolder.getCache(command.getCacheName(), true);
-
-        //Check if we are using sessions
-        if (command.hasSession()) {
+	// Check if we are using sessions
+	if (command.hasSession()) {
 
-            Map sessionMap = null;
+	    Map sessionMap = null;
 
-            //We are so use the session maps that is stored
-            Element element = cache.get(command.getSessionId());
-            if (element != null) {
-                sessionMap = (Map) element.getObjectValue();
-            } else {
-                sessionMap = Collections.synchronizedMap(new HashMap());
-            }
+	    // We are so use the session maps that is stored
+	    Element element = cache.get(command.getSessionId());
+	    if (element != null) {
+		sessionMap = (Map) element.getObjectValue();
+	    } else {
+		sessionMap = Collections.synchronizedMap(new HashMap());
+	    }
 
-            sessionMap.remove(command.getHashableKey());
+	    sessionMap.remove(command.getHashableKey());
 
-            //Put the session away
-            cache.put(new Element(command.getSessionId(), sessionMap));
+	    // Put the session away
+	    cache.put(new Element(command.getSessionId(), sessionMap));
 
-        } else {
+	} else {
 
-            //No session map so store the value
-            cache.remove(command.getHashableKey());
-        }
+	    // No session map so store the value
+	    cache.remove(command.getHashableKey());
+	}
 
-        //Notify peers
-        infoHolder.getCacheNotifier().notifyRemove(command);
+	// Notify peers
+	infoHolder.getCacheNotifier().notifyRemove(command);
     }
 
     public void processPutSession(PutSessionCommand command) {
-        Cache cache = infoHolder.getCache(command.getCacheName(), true);
+	Cache cache = infoHolder.getCache(command.getCacheName(), true);
 
-        //Place the raw session in the cache
-        try {
-            cache.put(new Element(command.getSessionId(), command.getRawSessionFromPayload()));
-
-            //Ack the message
-            MessageAckCommand ack = new MessageAckCommand();
-            ack.setMessageId(command.getCommandId());
-            byte [] packet = ack.createPacket(true);
-            int written = sendPacket(packet);
-            if (written == -1) {
-                //TODO - This means the socket is dead and need peer removal
-            }
-
-        } catch (IOException e) {
-            log.info(e);
-        }
+	// Place the raw session in the cache
+	try {
+	    cache.put(new Element(command.getSessionId(), command
+		    .getRawSessionFromPayload()));
+
+	    // Ack the message
+	    MessageAckCommand ack = new MessageAckCommand();
+	    ack.setMessageId(command.getCommandId());
+	    byte[] packet = ack.createPacket(true);
+	    sendPacket(packet);
+
+	} catch (IOException e) {
+	    // TODO - What should we do on an IOException, ignore it or
+                // remove the client?
+	    log.error(e);
+	}
 
-        //Notify peers
-        infoHolder.getCacheNotifier().notifyPutSession(command);
+	// Notify peers
+	infoHolder.getCacheNotifier().notifyPutSession(command);
     }
 
+    @SuppressWarnings({ "unchecked" })
     public void processPutEntry(PutEntryCommand command) {
-        Cache cache = infoHolder.getCache(command.getCacheName(), true);
-
-        //Check if we are using sessions
-        if (command.hasSession()) {
-
-            Map sessionMap = null;
-
-            //We are so use the session maps that is stored
-            Element element = cache.get(command.getSessionId());
-            if (element != null) {
-                sessionMap = (Map) element.getObjectValue();
-            } else {
-                sessionMap = Collections.synchronizedMap(new HashMap());
-            }
+	Cache cache = infoHolder.getCache(command.getCacheName(), true);
 
-            sessionMap.put(command.getHashableKey(), command.getRawPayload());
-            //Put the session away
-            cache.put(new Element(command.getSessionId(), sessionMap));
+	// Check if we are using sessions
+	if (command.hasSession()) {
 
-        } else {
+	    Map sessionMap = null;
 
-            //No session map so store the value
-            cache.put(new Element(command.getHashableKey(), command.getRawPayload()));
-        }
+	    // We are so use the session maps that is stored
+	    Element element = cache.get(command.getSessionId());
+	    if (element != null) {
+		sessionMap = (Map) element.getObjectValue();
+	    } else {
+		sessionMap = Collections.synchronizedMap(new HashMap());
+	    }
+
+	    sessionMap.put(command.getHashableKey(), command.getRawPayload());
+	    // Put the session away
+	    cache.put(new Element(command.getSessionId(), sessionMap));
+
+	} else {
+
+	    // No session map so store the value
+	    cache.put(new Element(command.getHashableKey(), command
+		    .getRawPayload()));
+	}
 
-        //Notify peers
-        infoHolder.getCacheNotifier().notifyPut(command);
+	// Notify peers
+	infoHolder.getCacheNotifier().notifyPut(command);
     }
 
     public void processMessageAck(MessageAckCommand command) {
     }
 
+    @SuppressWarnings("unchecked")
     public void processGetCache(GetCacheCommand command) {
+	Cache cache = infoHolder.getCache(command.getCacheName(), true);
+	
+	//Add the client endpoint
+	infoHolder.getEndpointManager().addEndpoint(new TCPEndpoint(key));
+	
+	//Send a bulk command
+	BulkSendCommand bulk = new BulkSendCommand();
+	bulk.setNumberOfCommands(cache.getSize());
+	try {
+	    if (sendPacket(bulk.createPacket(true)) < 0) {
+		return;
+	    }
+
+	    for (Object key : (List<Object>) cache.getKeys()) {
+		Element element = cache.get(key);
+		Object payload = element.getValue();
+
+		BaseCommand newCommand = null;
+		// Test if we are sending a session or not
+		if (payload instanceof HashMap) {
+		    PutSessionCommand psc = new PutSessionCommand();
+		    psc.setCacheName(command.getCacheName());
+		    psc.setSessionId((String) key);
+		    psc.setPayloadFromSession((Map) payload);
+		    newCommand = psc;
+		} else {
+		    PutEntryCommand pec = new PutEntryCommand();
+		    pec.setCacheName(command.getCacheName());
+		    pec.setRawPayload((byte[]) payload);
+		    pec.setRawKey((byte [])key);
+		    newCommand = pec;
+		}
+		
+		//Send the packet.  If there is a failure just abort
+		if (sendPacket(newCommand.createPacket(false)) < 0)
+		    return;
+	    }
+	    
+	    
+	} catch (IOException e) {
+	    // TODO - What should we do on an IOException, ignore it or
+                // remove the client?
+	    log.error(e);
+	}
     }
 
     public void processClearCache(ClearCacheCommand command) {
-        Cache cache = infoHolder.getCache(command.getCacheName(), true);
-        cache.removeAll();
+	Cache cache = infoHolder.getCache(command.getCacheName(), true);
+	cache.removeAll();
 
-        //Notify peers
-        infoHolder.getCacheNotifier().notifyClearCache(command);
+	// Notify peers
+	infoHolder.getCacheNotifier().notifyClearCache(command);
     }
 
     public void processBulkSend(BulkSendCommand command) {
     }
 
-    private int sendPacket(byte []packet) throws IOException {
-        //This line if for tests...key should not be null
-        if (key == null)
-            return 0;
-        SocketChannel channel = (SocketChannel) key.channel();
-        BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer.wrap(packet));
-        int written = bcw.writeBuffer(packet.length);
-        return written;
+    private int sendPacket(byte[] packet) throws IOException {
+	// This line if for tests...key should not be null
+	if (key == null)
+	    return 0;
+	SocketChannel channel = (SocketChannel) key.channel();
+	BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer
+		.wrap(packet));
+	int written = bcw.writeBuffer(packet.length);
+	if (written == -1) {
+	    // Remove the endpoint from the list of clients
+	    infoHolder.getEndpointManager().removeEndpoint(
+		    new TCPEndpoint(channel));
+	}
+	return written;
     }
 }

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java Mon Oct  9 16:51:03 2006
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.geronimo.gcache.transports.Endpoint;
+
+public class TCPEndpoint extends Object implements Endpoint{
+    
+    private SocketChannel channel;
+
+    public TCPEndpoint(SocketChannel channel) {
+	this.channel = channel;
+    }
+    
+    //Convenience constructor to extract channel from the key
+    public TCPEndpoint(SelectionKey key) {
+	channel = (SocketChannel) key.channel();
+    }
+
+    public SocketChannel getChannel() {
+        return channel;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+	return channel.socket().getInetAddress().equals(obj);
+    }
+
+    @Override
+    public int hashCode() {
+	return channel.socket().getInetAddress().hashCode();
+    }
+    
+
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java (added)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java Mon Oct  9 16:51:03 2006
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import net.sf.ehcache.CacheManager;
+
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
+import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.transports.TransportServer;
+import org.apache.geronimo.gcache.util.BufferChannelReader;
+import org.apache.geronimo.gcache.util.BufferChannelWriter;
+import org.apache.geronimo.gcache.util.ByteArrayInputStream;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
+
+public abstract class AbstractServer {
+    protected static final int port = 45678;
+    protected static final String host = "localhost";
+
+    protected TransportServer server = null;
+    protected ThreadPool pool;
+    protected SocketChannel clientChannel;
+    protected CacheInfoHolder info;
+    
+    @BeforeClass
+    public void setUp() throws Exception{
+        pool = new DefaultThreadPoolImpl(10);
+        CacheManager mgr = CacheManager.create();
+        info = new CacheInfoHolder(mgr);
+        DefaultSelectionKeyProcessorFactory factory = new DefaultSelectionKeyProcessorFactory(info);
+
+        server = new TCPSocketTransportServer(host, port, pool, 2000, factory);
+
+        server.start();
+        
+        //Create a client socket
+        clientChannel = SocketChannel.open();
+        clientChannel.connect(new InetSocketAddress(host, port));
+    }
+
+    @AfterClass(alwaysRun=true)
+    public void shutdown() throws Exception{
+        server.stop();
+        pool.shutdown();
+        
+        clientChannel.close();
+    }
+    
+    protected void sendCommand(BaseCommand command) throws IOException{
+	
+	byte bytes[] = command.createPacket(true);
+	BufferChannelWriter bcw = new BufferChannelWriter(clientChannel, ByteBuffer.wrap(bytes));
+        int written = bcw.writeBuffer(bytes.length);
+        assert written == bytes.length;
+	
+    }
+    
+    protected Command readCommand() throws IOException{
+	
+        ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13);
+        BufferChannelReader bcr = new BufferChannelReader(clientChannel, receiveHeader);
+        int read = bcr.readBuffer(receiveHeader.capacity());
+        assert read == Constants.MAGIC.length + 13;
+        receiveHeader.flip();
+
+        //Read the magic
+        byte magic[] = new byte[Constants.MAGIC.length];
+        receiveHeader.get(magic);
+
+        //Better match the Magic
+        assert Arrays.equals(Constants.MAGIC, magic);
+
+        //Get the command
+        byte commandIdentifier = receiveHeader.get();
+
+        //Get the checksum
+        long checksum = receiveHeader.getLong();
+
+        //Get the command length
+        int length = receiveHeader.getInt();
+
+        //pull the command
+        ByteBuffer commandBuffer = ByteBuffer.allocate(length);
+        bcr.reset(clientChannel, commandBuffer);
+        int count = bcr.readBuffer(length);
+        assert count == length;
+
+        //Calc a checksum
+        byte commandArray[] = commandBuffer.array();
+        Checksum calcChecksum = new CRC32();
+        calcChecksum.update(commandArray, 0, commandArray.length);
+        long newCheck  = calcChecksum.getValue();
+
+        //Checksums match
+        assert newCheck == checksum;
+        
+        //Now create the command
+        ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
+        ReadableByteChannel readChannel = Channels.newChannel(bias);
+
+        //Create the command and unmarshal the data
+        Command command = CommandTypes.createCommand(commandIdentifier);
+        command.readExternal(readChannel);
+        
+        return command;
+
+    }
+}

Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (added)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java Mon Oct  9 16:51:03 2006
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import org.apache.geronimo.gcache.command.BulkSendCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.GetCacheCommand;
+import org.apache.geronimo.gcache.transports.Endpoint;
+import org.testng.annotations.Test;
+
+public class TCPEndpointTest extends AbstractServer {
+    
+    @Test
+    public void testJoinCluster() throws Exception{
+	
+        assert 0 == info.getEndpointManager().size();
+        
+	GetCacheCommand command = new GetCacheCommand();
+	
+	command.setCacheName("Cache1");
+	
+        //Send the packet
+	sendCommand(command);
+	
+        //Now receive any data (it Should be a BulkSendCommand)
+        Command bulk = this.readCommand(); 
+
+        //Is the message the type we think it is?
+        assert bulk instanceof BulkSendCommand;
+
+        int commandsToFollow = ((BulkSendCommand)bulk).getNumberOfCommands();
+        
+        //Nothing is in the Cache, so no commands should follow
+        assert commandsToFollow == 0; 
+        
+        //Should have one client
+        assert 1 == info.getEndpointManager().size();
+        
+        Set<Endpoint> set = info.getEndpointManager().getEndpoints();
+        TCPEndpoint endpoint =  (TCPEndpoint)set.iterator().next();
+        SocketChannel endpointChannel = endpoint.getChannel();
+        
+        //Check that the socket addresses match (Remote on server == Local for client)
+        assert endpointChannel.socket().getRemoteSocketAddress().equals(clientChannel.socket().getLocalSocketAddress());
+        
+        
+    }
+
+}

Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Mon Oct  9 16:51:03 2006
@@ -16,50 +16,22 @@
  */
 package org.apache.geronimo.gcache.transports.tcp;
 
-import net.sf.ehcache.CacheManager;
-import org.apache.geronimo.gcache.CacheInfoHolder;
-import org.apache.geronimo.gcache.util.BufferChannelReader;
-import org.apache.geronimo.gcache.command.PutSessionCommand;
-import org.apache.geronimo.gcache.command.BaseCommand;
-import org.apache.geronimo.gcache.command.Command;
-import org.apache.geronimo.gcache.command.CommandTypes;
-import org.apache.geronimo.gcache.command.MessageAckCommand;
-import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
-import org.apache.geronimo.gcache.transports.TransportServer;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.Test;
-
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.Channels;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Arrays;
-import java.util.zip.Checksum;
-import java.util.zip.CRC32;
-import java.io.ByteArrayInputStream;
-
-public class TcpSocketServerTest {
 
-    private static final int port = 45678;
-    private static final String host = "localhost";
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.testng.annotations.Test;
 
-    TransportServer server = null;
-    ThreadPool pool;
+public class TcpSocketServerTest extends AbstractServer {
 
     @Test()
-    public void sendSession() throws Exception {
-
-        //Create a client socket
-        SocketChannel channel = SocketChannel.open();
-        channel.connect(new InetSocketAddress(host, port));
+    public void sendSession()  throws Exception {
 
         //Create a session
-        Map session = new HashMap();
+        Map<String,String> session = new HashMap<String, String>();
         session.put("key1","data1");
         session.put("key2","data2");
         session.put("key3","data3");
@@ -70,53 +42,10 @@
 
         //Send the packet
         ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket(true));
-        channel.write(commandBuffer);
+        clientChannel.write(commandBuffer);
 
         //Now receive any data (it Should be a MessageAck)
-        ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13);
-        BufferChannelReader bcr = new BufferChannelReader(channel, receiveHeader);
-        int read = bcr.readBuffer(receiveHeader.capacity());
-        assert read == Constants.MAGIC.length + 13;
-        receiveHeader.flip();
-
-        //Read the magic
-        byte magic[] = new byte[Constants.MAGIC.length];
-        receiveHeader.get(magic);
-
-        //Better match the Magic
-        assert Arrays.equals(Constants.MAGIC, magic);
-
-        //Get the command
-        byte commandIdentifier = receiveHeader.get();
-
-        //Get the checksum
-        long checksum = receiveHeader.getLong();
-
-        //Get the command length
-        int length = receiveHeader.getInt();
-
-        //pull the command
-        commandBuffer = ByteBuffer.allocate(length);
-        bcr.reset(channel, commandBuffer);
-        int count = bcr.readBuffer(length);
-        assert count == length;
-
-        //Calc a checksum
-        byte commandArray[] = commandBuffer.array();
-        Checksum calcChecksum = new CRC32();
-        calcChecksum.update(commandArray, 0, commandArray.length);
-        long newCheck  = calcChecksum.getValue();
-
-        //Checksums match
-        assert newCheck == checksum;
-
-        //Now create the command
-        ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
-        ReadableByteChannel readChannel = Channels.newChannel(bias);
-
-        //Create the command and unmarshal the data
-        Command ackCommand = CommandTypes.createCommand(commandIdentifier);
-        ackCommand.readExternal(readChannel);
+        Command ackCommand = this.readCommand(); 
 
         //Is the message the type we think it is?
         assert ackCommand instanceof MessageAckCommand;
@@ -125,24 +54,5 @@
         assert command.getCommandId() == ((MessageAckCommand)ackCommand).getMessageId();
 
     }
-
-
-  @BeforeSuite
-  public void setUp() throws Exception{
-      pool = new DefaultThreadPoolImpl(10);
-      CacheManager mgr = CacheManager.create();
-      CacheInfoHolder info = new CacheInfoHolder(mgr);
-      DefaultSelectionKeyProcessorFactory factory = new DefaultSelectionKeyProcessorFactory(info);
-
-      server = new TCPSocketTransportServer(host, port, pool, 2000, factory);
-
-      server.start();
-  }
-
-  @AfterSuite(alwaysRun=true)
-  public void shutdown() throws Exception{
-      server.stop();
-      pool.shutdown();
-  }
 
 }