You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/09/28 09:20:23 UTC

svn commit: r450728 - in /geronimo/sandbox/gcache/server/src: main/java/org/apache/geronimo/gcache/ main/java/org/apache/geronimo/gcache/command/ main/java/org/apache/geronimo/gcache/transports/ main/java/org/apache/geronimo/gcache/transports/tcp/ main...

Author: jgenender
Date: Thu Sep 28 00:20:20 2006
New Revision: 450728

URL: http://svn.apache.org/viewvc?view=rev&rev=450728
Log:
Begin communicating with the cache through the TCPTransport

Added:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java   (with props)
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java   (contents, props changed)
      - copied, changed from r448152, geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java   (with props)
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java   (with props)
Removed:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java
Modified:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java Thu Sep 28 00:20:20 2006
@@ -24,8 +24,8 @@
 import org.apache.geronimo.gcache.server.listeners.DefaultCacheNotifier;
 
 public class CacheInfoHolder {
-    public final CacheManager cacheManager;
-    public CacheNotifier cacheNotifier = null;
+    private final CacheManager cacheManager;
+    private CacheNotifier cacheNotifier = null;
 
     public CacheInfoHolder(CacheManager cacheManager) {
         this.cacheManager = cacheManager;

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java Thu Sep 28 00:20:20 2006
@@ -37,8 +37,8 @@
             execute();
         }
 
-        running = false;
         cleanUp();
+        running = false;
     }
 
     protected abstract void initialize();

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java Thu Sep 28 00:20:20 2006
@@ -18,25 +18,32 @@
  */
 package org.apache.geronimo.gcache.command;
 
-import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.transports.tcp.Constants;
 import org.apache.geronimo.gcache.util.ByteArrayInputStream;
+import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
 import org.apache.geronimo.gcache.util.UniqueId;
-import org.apache.geronimo.gcache.marshal.MarshalAware;
-import org.apache.geronimo.gcache.CacheInfoHolder;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.io.ObjectOutputStream;
 import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
-import java.nio.channels.Channels;
 import java.nio.charset.Charset;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 
 public class BaseCommand implements Command {
 
     private long commandId = 0;
+    private Checksum checksum = null;
+
+    public byte getCommandType() throws IOException {
+        throw new IOException("Invalid command type");
+    }
 
     public long getCommandId() {
         if (commandId == 0){
@@ -67,6 +74,45 @@
             commandId = UniqueId.get();
         }
         writeLong(channel, commandId);
+    }
+
+    public byte[] createPacket() throws IOException{
+        // COMMAND TYPE - 1 byte
+        // CHECKSUM - 8 bytes
+        // COMMAND LENGTH - 4 bytes
+        // COMMAND - ? bytes as defined above/
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
+        WritableByteChannel channel = Channels.newChannel(baos);
+
+        //Write the command type
+        baos.write(getCommandType());
+
+        //Place hold the checksum
+        baos.write(new byte[] {0,0,0,0,0,0,0,0});
+
+        //Place hold the length
+        baos.write(new byte[] {0,0,0,0});
+
+        //Marshal the command
+        writeExternal(channel);
+        channel.close();
+
+        int commandStart = 13;
+
+        //Get the entire command
+        byte command[] = baos.toByteArray();
+
+        int len = command.length - commandStart;
+
+        Checksum checksum = new CRC32();
+        checksum.update(command, commandStart, len);
+
+        //Write the checksum
+        ByteBuffer.wrap(command, 1, 8).putLong(checksum.getValue());
+
+        //Write the length of the command
+        ByteBuffer.wrap(command, 9, 4).putInt(len);
+        return command;
     }
 
     public byte[] marshal() throws IOException {

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java Thu Sep 28 00:20:20 2006
@@ -28,6 +28,10 @@
 
     private int numberOfCommands;
 
+    public byte getCommandType() throws IOException {
+        return CommandTypes.BULK_SEND_COMMAND;
+    }
+
     public int getNumberOfCommands() {
         return numberOfCommands;
     }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java Thu Sep 28 00:20:20 2006
@@ -84,7 +84,7 @@
 
         //Process what we want read
         cacheName = readString(channel);
-        key = this.readBytes(channel);
+        key = readBytes(channel);
         sessionId = readString(channel);
     }
 

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java Thu Sep 28 00:20:20 2006
@@ -25,6 +25,10 @@
 
 public class ClearCacheCommand extends CacheBaseCommand {
 
+    public byte getCommandType() throws IOException {
+        return CommandTypes.CLEARCACHE_COMMAND;
+    }
+
     public void execute(CacheInfoHolder info) throws IOException {
         Cache cache = info.getCache(getCacheName(), true);
         cache.removeAll();

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java Thu Sep 28 00:20:20 2006
@@ -28,7 +28,9 @@
  * invoked upon reciept.
  */
 public interface Command extends MarshalAware {
-    long getCommandId();
+    public long getCommandId();
 
-    void execute(CacheInfoHolder info) throws IOException;
+    public byte getCommandType() throws IOException;
+
+    public void execute(CacheInfoHolder info) throws IOException;
 }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java Thu Sep 28 00:20:20 2006
@@ -1,13 +1,14 @@
 package org.apache.geronimo.gcache.command;
 
 public class CommandTypes {
-    public static final int PUT_ENTRY_COMMAND = 1;
-    public static final int REMOVE_ENTRY_COMMAND = 2;
-    public static final int CLEARCACHE_COMMAND = 3;
-    public static final int MESSAGE_ACK_COMMAND = 4;
-    public static final int BULK_SEND_COMMAND = 5;
-    public static final int REMOVE_SESSION_COMMAND = 6;
-    public static final int PUT_SESSION_COMMAND = 7;
+    public static final byte PUT_ENTRY_COMMAND = 1;
+    public static final byte REMOVE_ENTRY_COMMAND = 2;
+    public static final byte CLEARCACHE_COMMAND = 3;
+    public static final byte MESSAGE_ACK_COMMAND = 4;
+    public static final byte BULK_SEND_COMMAND = 5;
+    public static final byte REMOVE_SESSION_COMMAND = 6;
+    public static final byte PUT_SESSION_COMMAND = 7;
+    public static final byte GET_CACHE_COMMAND = 8;
 
     public static Command createCommand(int identifier) {
         Command command = null;
@@ -32,6 +33,9 @@
                 break;
             case(PUT_SESSION_COMMAND):
                 command = new PutSessionCommand();
+                break;
+            case(GET_CACHE_COMMAND):
+                command = new GetCacheCommand();
                 break;
         }
         return command;

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java Thu Sep 28 00:20:20 2006
@@ -25,15 +25,19 @@
 
 import net.sf.ehcache.Cache;
 
+/**
+ * This class represents a request to obtain a copy of an
+ * entire cache.  The sender should set the cache name, and
+ * the receiver will acknowledge this by sending a BulkSendCommand
+ * back to the client, followed by PutSessionCommands (for sessions)
+ * and PutEntryCommands for non-Session data.
+ */
 public class GetCacheCommand extends CacheBaseCommand{
 
-    public void execute(CacheInfoHolder info) throws IOException {
-        //TODO - Do we lock the Cache???
-        Cache cache = info.getCache(getCacheName(), true);
-
-        List list = cache.getKeys();
-        
-        int size = list.size();
+    public byte getCommandType() throws IOException {
+        return CommandTypes.GET_CACHE_COMMAND;
+    }
 
+    public void execute(CacheInfoHolder info) throws IOException {
     }
 }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java Thu Sep 28 00:20:20 2006
@@ -28,6 +28,10 @@
 
     private long messageId = 0;
 
+    public byte getCommandType() throws IOException {
+        return CommandTypes.MESSAGE_ACK_COMMAND;
+    }
+
     public long getMessageId() {
         return messageId;
     }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java Thu Sep 28 00:20:20 2006
@@ -30,6 +30,10 @@
 
 public class PutEntryCommand extends CachePayloadBaseCommand {
 
+    public byte getCommandType() throws IOException {
+        return CommandTypes.PUT_ENTRY_COMMAND;
+    }
+
     public void execute(CacheInfoHolder info) throws IOException {
         Cache cache = info.getCache(getCacheName(), true);
 

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java Thu Sep 28 00:20:20 2006
@@ -40,6 +40,10 @@
 
     private int entryCount;
 
+    public byte getCommandType() throws IOException {
+        return CommandTypes.PUT_SESSION_COMMAND;
+    }
+
     public void readExternal(ReadableByteChannel channel) throws IOException {
         super.readExternal(channel);
         entryCount = readInt(channel);
@@ -264,6 +268,6 @@
         cache.put(new Element(getSessionId(), getRawSessionFromPayload()));
 
         //Notify peers
-        info.cacheNotifier.notifyPutSession(this);
+        info.getCacheNotifier().notifyPutSession(this);
     }
 }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java Thu Sep 28 00:20:20 2006
@@ -30,6 +30,11 @@
 import net.sf.ehcache.Cache;
 
 public class RemoveEntryCommand extends CacheBaseCommand{
+
+    public byte getCommandType() throws IOException {
+        return CommandTypes.REMOVE_ENTRY_COMMAND;
+    }
+
     public void execute(CacheInfoHolder info) throws IOException {
         Cache cache = info.getCache(getCacheName(), true);
 

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java Thu Sep 28 00:20:20 2006
@@ -25,6 +25,10 @@
 
 public class RemoveSessionCommand extends CacheBaseCommand{
 
+    public byte getCommandType() throws IOException {
+        return CommandTypes.REMOVE_SESSION_COMMAND;
+    }
+
     public void execute(CacheInfoHolder info) throws IOException {
         Cache cache = info.getCache(getCacheName(), true);
 

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java?view=auto&rev=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java Thu Sep 28 00:20:20 2006
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.transports;
+
+import java.io.IOException;
+
+public interface TransportServer {
+
+    public void start() throws IOException;
+    public void stop() throws IOException;
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java Thu Sep 28 00:20:20 2006
@@ -18,57 +18,108 @@
  */
 package org.apache.geronimo.gcache.transports.tcp;
 
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.util.BufferChannelInputStream;
+import org.apache.geronimo.gcache.util.BufferChannelReader;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.geronimo.gcache.command.BaseCommand;
+
 import java.io.IOException;
+import java.io.ByteArrayInputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.Channels;
 import java.util.Arrays;
-
-import org.apache.geronimo.gcache.command.Command;
-import org.apache.geronimo.gcache.command.CommandTypes;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
 
 public class DefaultSelectionKeyProcessor implements SelectionKeyProcessor {
 
-    private final static int HEADER_SIZE = Constants.MAGIC.length + 4;
+    private final static int HEADER_SIZE = Constants.MAGIC.length + 1 + 8 + 4;
+    private CacheInfoHolder infoHolder = null;
+
+    public DefaultSelectionKeyProcessor(CacheInfoHolder infoHolder) {
+        this.infoHolder = infoHolder;
+    }
 
     public void process(SelectionKey key) throws IOException {
         SocketChannel channel = (SocketChannel) key.channel();
 
-        // first byte is the identifier, the next 4 are the checksum
-        ByteBuffer buffer = ByteBuffer.allocateDirect(HEADER_SIZE);
-        int count;
-        while ((count = channel.read(buffer)) > 0) {
-            //Better at least have the size of the HEADER
-            if (count < HEADER_SIZE){
-                disconnect(channel);
-                return;
-            }
-
-            buffer.flip();
-
-            byte magic[] = new byte[Constants.MAGIC.length];
-            buffer.get(magic);
-
-            //Better match the Magic
-            if (!Arrays.equals(Constants.MAGIC, magic)){
-                disconnect(channel);
-                return;
-            }
-
-            int commandIdentifier = buffer.getInt();
-            Command command = CommandTypes.createCommand(commandIdentifier);
-            command.readExternal(channel);
-            //TODO execute the command
-//            command.execute();
+        // HEADER BREAKDOWN
+        // ---------------------------------------------------------
+        // MAGIC HEADER - 6 bytes
+        // COMMAND - 1 byte
+        // CHECKSUM - 8 bytes
+        // COMMAND LENGTH - 4 bytes
+        // COMMAND - ? bytes as defined above
+        ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE);
+
+        BufferChannelReader bcr = new BufferChannelReader(channel, buffer);
+        int count = bcr.readBuffer(HEADER_SIZE);
+        if (count < HEADER_SIZE) {
+            //Nope...we are outta here
+            disconnect(channel);
+            return;
+        }
+        buffer.flip();
+
+        //Read the magic
+        byte magic[] = new byte[Constants.MAGIC.length];
+        buffer.get(magic);
+
+        //Better match the Magic
+        if (!Arrays.equals(Constants.MAGIC, magic)) {
+            //Magic doesn't match, so close the socket
+            disconnect(channel);
+            return;
         }
 
-        // Test for closed connection
+        //Get the command
+        byte commandIdentifier = buffer.get();
+
+        //Get the checksum
+        long checksum = buffer.getLong();
+
+        //Get the command length
+        int length = buffer.getInt();
+
+        //TODO - This really should be done with a ByteBuffer pool
+        ByteBuffer commandBuffer = ByteBuffer.allocate(length);
+        count = channel.read(commandBuffer);
+        if (count < length) {
+            //Command was bad
+            //TODO - Send back a resend?
+
+            //Exit out of here
+//            break;
+        }
+
+        byte commandArray[] = commandBuffer.array();
+        Checksum calcChecksum = new CRC32();
+        calcChecksum.update(commandArray, 0, commandArray.length);
+        long newCheck  = calcChecksum.getValue();
+
+        ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
+        ReadableByteChannel readChannel = Channels.newChannel(bias);
+
+        //Create the command and unmarshal the data
+        Command command = CommandTypes.createCommand(commandIdentifier);
+        command.readExternal(readChannel);
+
+        //Do something with the data
+        command.execute(infoHolder);
+
+        // Test for closed connection on the client side
         if (count < 0) {
             channel.close();
+            //TODO - remove the peer
         }
     }
 
-    private void disconnect(SocketChannel channel) throws IOException{
+    private void disconnect(SocketChannel channel) throws IOException {
         try {
             channel.socket().shutdownOutput();
         } catch (IOException e) {

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java Thu Sep 28 00:20:20 2006
@@ -38,17 +38,10 @@
     private Selector selector = null;
     private int timeOut = 0;
 
-    public TCPSocketHandler(int timeOut) throws IOException{
-        this(timeOut, null);
-    }
-
     public TCPSocketHandler(int timeOut, SelectionKeyProcessor processor) throws IOException{
         this.timeOut = timeOut;
 
-        if (processor == null)
-            this.processor = new DefaultSelectionKeyProcessor();
-        else
-            this.processor = processor;
+        this.processor = processor;
 
         selector = Selector.open();
     }
@@ -91,10 +84,23 @@
                     processor.process(key);
                 }
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             e.printStackTrace();
-        } catch (InterruptedException e) {
-            log.error("InterruptedException occured", e);
+        }
+    }
+
+    public void halt() {
+        super.halt();
+
+        //Wake up the selector if its in a wait state
+        selector.wakeup();
+
+        //Wait until everything is shutdown
+        while (isRunning()){
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException e) {
+            }
         }
     }
 

Copied: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java (from r448152, geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java)
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java?view=diff&rev=450728&p1=geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java&r1=448152&p2=geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java Thu Sep 28 00:20:20 2006
@@ -17,6 +17,7 @@
 package org.apache.geronimo.gcache.transports.tcp;
 
 import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.transports.TransportServer;
 
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SelectionKey;
@@ -24,7 +25,7 @@
 import java.net.ServerSocket;
 import java.io.IOException;
 
-public class TCPSocketServer {
+public class TCPSocketTransportServer implements TransportServer {
 
     private ServerSocketChannel server = null;
     private InetSocketAddress inet = null;
@@ -33,11 +34,7 @@
     private int timeOut = 0;
     private SelectionKeyProcessor processor;
 
-    public TCPSocketServer(String address, int port, ThreadPool threadPool, int timeOut) {
-        this(address, port, threadPool, timeOut, null);
-    }
-
-    public TCPSocketServer(String address, int port, ThreadPool threadPool, int timeOut, SelectionKeyProcessor processor) {
+    public TCPSocketTransportServer(String address, int port, ThreadPool threadPool, int timeOut, SelectionKeyProcessor processor) {
 
         inet = new InetSocketAddress(address, port);
         pool = threadPool;

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java?view=auto&rev=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java Thu Sep 28 00:20:20 2006
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.util;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.Channel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+
+/**
+ * Provides an input stream to read a socket channel and fill a buffer
+ */
+public class BufferChannelInputStream extends InputStream {
+
+    private SocketChannel channel = null;
+    private ByteBuffer buffer = null;
+    private long timeout = 10000;
+
+    public BufferChannelInputStream(SocketChannel channel, ByteBuffer buffer) {
+        this.channel = channel;
+        this.buffer = buffer;
+        buffer.flip();
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /**
+     * Read's next byte of data
+     *
+     * @return next byte of data or -1 if it cannot read
+     * @throws IOException
+     */
+    public int read() throws IOException {
+
+        //Have we read everything?
+        if (!buffer.hasRemaining()) {
+            //Yep...so refill the buffer
+            if (timedRead() <= 0)
+                return -1;
+        }
+
+        return (int) (buffer.get() & 0xff);
+    }
+
+    public long readLong() throws IOException {
+        //Have we read everything?
+        if (!buffer.hasRemaining()) {
+            //Yep...so refill the buffer
+            if (timedRead() <= 0)
+                return -1;
+        }
+
+        if (buffer.remaining() >= 8){
+           return buffer.getLong();
+        } else {
+            throw new BufferUnderflowException();
+        }
+    }
+
+    public int readInt() throws IOException {
+        //Have we read everything?
+        if (!buffer.hasRemaining()) {
+            //Yep...so refill the buffer
+            if (timedRead() <= 0)
+                return -1;
+        }
+
+        if (buffer.remaining() >= 4){
+            return buffer.getInt();
+        } else {
+            throw new BufferUnderflowException();
+        }
+    }
+
+    public int read(byte[] bytes) throws IOException {
+        return this.read(bytes, 0, bytes.length);
+    }
+
+    public int read(byte[] bytes, int offset, int len) throws IOException {
+        //Have we read everything?
+        if (!buffer.hasRemaining()) {
+            //Yep...so refill the buffer
+            if (timedRead() <= 0)
+                return -1;
+        }
+
+        if (len > buffer.remaining()) {
+            len = buffer.remaining();
+        }
+        buffer.get(bytes, offset, len);
+
+        return (len);
+
+    }
+
+    private int timedRead() throws IOException {
+        //Reset the buffer
+        buffer.clear();
+        int read = internalRead();
+
+        //Test if anything came inbound...
+        if (read == 0) {
+            //Nope...now need to wait
+            Selector selector = null;
+            SelectionKey key = null;
+            try{
+                Selector.open();
+
+                //Listen in for data on the line
+                channel.register(selector, SelectionKey.OP_READ);
+
+                int retVal = selector.select(timeout);
+                if (retVal == 0) {
+                    //Hmmm...wakeup or timeout and nothing found?
+                    return 0;
+                }
+
+                //retVal must equal 1, because there is only 1 key, so read it
+                read = internalRead();
+            }finally{
+                if (key != null){
+                    key.cancel();
+                }
+                if (selector != null){
+                    selector.close();
+                }
+            }
+        }
+
+        buffer.flip();
+        return read;
+    }
+
+    private int internalRead() throws IOException {
+
+        int read = 0;
+        int totalBytes = 0;
+
+        do {
+            read = channel.read(buffer);
+            totalBytes += read;
+        } while (read > 0);
+
+        //Test for a closed socket
+        if (read < 0) {
+            //Return with EOF
+            return -1;
+        }
+
+        return totalBytes;
+    }
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java?view=auto&rev=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java Thu Sep 28 00:20:20 2006
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.util;
+
+import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+public class BufferChannelReader {
+    private SocketChannel channel = null;
+    private ByteBuffer buffer = null;
+    private long timeout = 10000;
+
+    public BufferChannelReader(SocketChannel channel, ByteBuffer buffer) {
+        this.channel = channel;
+        this.buffer = buffer;
+        buffer.flip();
+    }
+
+    public int readBuffer(int expectedCount) throws IOException {
+        int read = timedRead(expectedCount);
+
+        return read;
+    }
+
+    private int timedRead(int size) throws IOException {
+        //Reset the buffer
+        buffer.clear();
+        int read = internalRead();
+
+        //If we got the full tomato, then leave
+        if (read == size)
+            return read;
+
+        //Test if anything came inbound or we have more bytes that need reading...
+        if ((read == 0) || (read < size)) {
+
+            //Nope...now need to wait
+            Selector selector = null;
+            SelectionKey key = null;
+            try {
+                Selector.open();
+
+                //Listen in for data on the line
+                channel.register(selector, SelectionKey.OP_READ);
+
+                int retVal = selector.select(timeout);
+                if (retVal == 0) {
+                    //Hmmm...wakeup or timeout and nothing found?
+                    return 0;
+                }
+
+                //retVal must equal 1, because there is only 1 key, so read it
+                int moreRead = internalRead();
+                //Error?
+                if (moreRead < 0)
+                    return moreRead;
+
+                read += moreRead;
+
+            } finally {
+                if (key != null) {
+                    key.cancel();
+                }
+                if (selector != null) {
+                    selector.close();
+                }
+            }
+        }
+
+        return read;
+    }
+
+    private int internalRead() throws IOException {
+
+        int read = 0;
+        int totalBytes = 0;
+
+        do {
+            read = channel.read(buffer);
+            totalBytes += read;
+        } while (read > 0);
+
+        //Test for a closed socket
+        if (read < 0) {
+            //Return with EOF
+            return -1;
+        }
+
+        return totalBytes;
+    }
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Thu Sep 28 00:20:20 2006
@@ -16,24 +16,70 @@
  */
 package org.apache.geronimo.gcache.transports.tcp;
 
-import org.testng.annotations.Test;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import net.sf.ehcache.CacheManager;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
 import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
+import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.transports.TransportServer;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
 
 public class TcpSocketServerTest {
-    
+
+    private static final int port = 45678;
+    private static final String host = "localhost";
+
+    TransportServer server = null;
+
     @Test()
-    public void runServer() throws Exception {
+    public void sendSession() throws Exception {
 
-        /**
-        ThreadPool pool = new DefaultThreadPoolImpl(10);
-        MockSelectionKeyProcessor mock = new MockSelectionKeyProcessor();
-        //TCPSocketServer server = new TCPSocketServer("localhost", 45678, pool, 2000, mock);
-        TCPSocketServer server = new TCPSocketServer("localhost", 45678, pool, 2000);
-        server.start();
-        Thread.sleep(100000);
-        server.stop();
-         **/
+        //Create a client socket
+        SocketChannel channel = SocketChannel.open();
+        channel.connect(new InetSocketAddress(host, port));
+
+        //Create a session
+        Map session = new HashMap();
+        session.put("key1","data1");
+        session.put("key2","data2");
+        session.put("key3","data3");
+        PutSessionCommand command = new PutSessionCommand();
+        command.setCacheName("Cache1");
+        command.setSessionId("Session1");
+        command.setPayloadFromSession(session);
+
+        //Send the packet
+        ByteBuffer magic = ByteBuffer.wrap(Constants.MAGIC);
+        ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket());
+        channel.write(magic);
+        channel.write(commandBuffer);
 
     }
+
+
+  @BeforeSuite
+  public void setUp() throws Exception{
+      ThreadPool pool = new DefaultThreadPoolImpl(10);
+      CacheManager mgr = CacheManager.create();
+      CacheInfoHolder info = new CacheInfoHolder(mgr);
+      DefaultSelectionKeyProcessor kp = new DefaultSelectionKeyProcessor(info);
+
+      server = new TCPSocketTransportServer(host, port, pool, 2000, kp);
+
+      server.start();
+  }
+
+  @AfterSuite(alwaysRun=true)
+  public void shutdown() throws Exception{
+      server.stop();
+  }
+
 }