You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/03 04:24:00 UTC

svn commit: r452292 - in /geronimo/sandbox/gcache/server/src: main/java/org/apache/geronimo/gcache/command/ main/java/org/apache/geronimo/gcache/transports/tcp/ main/java/org/apache/geronimo/gcache/util/ test/java/org/apache/geronimo/gcache/transports/...

Author: jgenender
Date: Mon Oct  2 19:23:59 2006
New Revision: 452292

URL: http://svn.apache.org/viewvc?view=rev&rev=452292
Log:
Get server to reply with ack for put session

Added:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java   (with props)
Modified:
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
    geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
    geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java Mon Oct  2 19:23:59 2006
@@ -32,6 +32,7 @@
 
 import org.apache.geronimo.gcache.marshal.MarshalAware;
 import org.apache.geronimo.gcache.transports.CommandVisitor;
+import org.apache.geronimo.gcache.transports.tcp.Constants;
 import org.apache.geronimo.gcache.util.ByteArrayInputStream;
 import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
 import org.apache.geronimo.gcache.util.UniqueId;
@@ -55,7 +56,7 @@
     /**
      * The command id is used to uniquely identify this command and is
      * specifically used in the case where a response is required.
-     * If the command id is still set to the default 0 a unique id is generated by the 
+     * If the command id is still set to the default 0 a unique id is generated by the
      * UniqueId class.
      * @see org.apache.geronimo.gcache.util.UniqueId
      */
@@ -70,7 +71,7 @@
      * This method is provided for the cases where a specfic id is required to
      * coordinate with another (external) framework. Typical use will not
      * require this method to be called, the default mechanism will suffice.
-     * 
+     *
      * @param commandId
      */
     public void setCommandId(long commandId) {
@@ -88,7 +89,7 @@
     }
 
     /**
-     * @see org.apache.geronimo.gcache.marshal.MarshalAware.readExternal(ReadableByteChannel) 
+     * @see org.apache.geronimo.gcache.marshal.MarshalAware.readExternal(ReadableByteChannel)
      */
     public void readExternal(ReadableByteChannel channel) throws IOException {
         // this is the root so no super impl, others should call super first
@@ -97,7 +98,7 @@
     }
 
     /**
-     * @see org.apache.geronimo.gcache.marshal.MarshalAware.writeExternal(WritableByteChannel) 
+     * @see org.apache.geronimo.gcache.marshal.MarshalAware.writeExternal(WritableByteChannel)
      */
     public void writeExternal(WritableByteChannel channel) throws IOException {
         // this is the root so no super impl, others should call super first
@@ -114,21 +115,23 @@
      * type, the second 8 bytes are a checksum, the next 4 bytes are the command
      * length. The remainder of the bytes will be the result of the
      * writeExternal implementation of the command.
-     * 
+     *
      * @return
      * @throws IOException
      */
-    // TODO: Is this the place for this method? Seems higher level to me, this
-    // class should not know how
-    // to make a packet IMO.
-    public byte[] createPacket() throws IOException{
+    public byte[] createPacket(boolean includeMagic) throws IOException{
         // COMMAND TYPE - 1 byte
         // CHECKSUM - 8 bytes
         // COMMAND LENGTH - 4 bytes
         // COMMAND - ? bytes as defined above/
+        int startFrom = includeMagic ? Constants.MAGIC.length : 0;
+
         ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
         WritableByteChannel channel = Channels.newChannel(baos);
 
+        //Write magic if requested
+        baos.write(Constants.MAGIC);
+
         //Write the command type
         baos.write(getCommandType());
 
@@ -143,7 +146,7 @@
         //TODO error handeling
         channel.close();
 
-        int commandStart = 13;
+        int commandStart = startFrom + 13;
 
         //Get the entire command
         byte command[] = baos.toByteArray();
@@ -154,19 +157,13 @@
         checksum.update(command, commandStart, len);
 
         //Write the checksum
-        ByteBuffer.wrap(command, 1, 8).putLong(checksum.getValue());
+        ByteBuffer.wrap(command, startFrom + 1, 8).putLong(checksum.getValue());
 
         //Write the length of the command
-        ByteBuffer.wrap(command, 9, 4).putInt(len);
+        ByteBuffer.wrap(command, startFrom + 9, 4).putInt(len);
         return command;
     }
 
-    // TODO - how is marshal used differently than createPacket? Seems
-    // redundant.
-    // It apears from the usage that this is a test utility method that makes it
-    // easier to test writeExternal/readExternal.
-    // No problem with that but I'd prefer it be part of the test code instead
-    // of here.
     public byte[] marshal() throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
         WritableByteChannel channel = Channels.newChannel(baos);
@@ -290,7 +287,7 @@
         } else {
             buffer.putInt(-1);
         }
-        
+
         buffer.flip();
         channel.write(buffer);
 
@@ -354,7 +351,7 @@
          * MarshalAware is requried so objects that get put into the cache can
          * take advantage of the faster way of serializaing objects, if they
          * don't want to then they can skip it or for simplicity's sake they
-         * can start with Serizliazable then implement MarshalAware if 
+         * can start with Serizliazable then implement MarshalAware if
          * serizlization performance becomes a concern.
          */
         if (object instanceof MarshalAware) {

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java Mon Oct  2 19:23:59 2006
@@ -91,13 +91,18 @@
 
         //TODO - This really should be done with a ByteBuffer pool
         ByteBuffer commandBuffer = ByteBuffer.allocate(length);
-        count = channel.read(commandBuffer);
+        bcr.reset(channel, commandBuffer);
+        count = bcr.readBuffer(length);
+        if (count < 0) {
+            channel.close();
+            //TODO - remove the peer as the socket was closed
+            return;
+        }
         if (count < length) {
             //Command was bad
             //TODO - Send back a resend?
+            return;
 
-            //Exit out of here
-//            break;
         }
 
         byte commandArray[] = commandBuffer.array();
@@ -116,11 +121,6 @@
         TCPCommandVisitor visitor = new TCPCommandVisitor(infoHolder, key);
         command.execute(visitor);
 
-        // Test for closed connection on the client side
-        if (count < 0) {
-            channel.close();
-            //TODO - remove the peer
-        }
     }
 
     private void disconnect(SocketChannel channel) throws IOException {

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Mon Oct  2 19:23:59 2006
@@ -18,26 +18,29 @@
  */
 package org.apache.geronimo.gcache.transports.tcp;
 
-import org.apache.geronimo.gcache.transports.CommandVisitor;
-import org.apache.geronimo.gcache.CacheInfoHolder;
-import org.apache.geronimo.gcache.command.RemoveSessionCommand;
-import org.apache.geronimo.gcache.command.RemoveEntryCommand;
-import org.apache.geronimo.gcache.command.PutSessionCommand;
-import org.apache.geronimo.gcache.command.PutEntryCommand;
-import org.apache.geronimo.gcache.command.MessageAckCommand;
-import org.apache.geronimo.gcache.command.GetCacheCommand;
-import org.apache.geronimo.gcache.command.ClearCacheCommand;
-import org.apache.geronimo.gcache.command.BulkSendCommand;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import net.sf.ehcache.Cache;
 import net.sf.ehcache.Element;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.BulkSendCommand;
+import org.apache.geronimo.gcache.command.ClearCacheCommand;
+import org.apache.geronimo.gcache.command.GetCacheCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.geronimo.gcache.command.PutEntryCommand;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.apache.geronimo.gcache.command.RemoveEntryCommand;
+import org.apache.geronimo.gcache.command.RemoveSessionCommand;
+import org.apache.geronimo.gcache.transports.CommandVisitor;
+import org.apache.geronimo.gcache.util.BufferChannelWriter;
 
-import java.util.Map;
-import java.util.Collections;
-import java.util.HashMap;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 public class TCPCommandVisitor implements CommandVisitor {
     Log log = LogFactory.getLog(TCPCommandVisitor.class);
@@ -102,6 +105,16 @@
         //Place the raw session in the cache
         try {
             cache.put(new Element(command.getSessionId(), command.getRawSessionFromPayload()));
+
+            //Ack the message
+            MessageAckCommand ack = new MessageAckCommand();
+            ack.setMessageId(command.getCommandId());
+            byte [] packet = ack.createPacket(true);
+            int written = sendPacket(packet);
+            if (written == -1) {
+                //TODO - This means the socket is dead and need peer removal
+            }
+
         } catch (IOException e) {
             log.info(e);
         }
@@ -155,5 +168,15 @@
     }
 
     public void processBulkSend(BulkSendCommand command) {
+    }
+
+    private int sendPacket(byte []packet) throws IOException {
+        //This line if for tests...key should not be null
+        if (key == null)
+            return 0;
+        SocketChannel channel = (SocketChannel) key.channel();
+        BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer.wrap(packet));
+        int written = bcw.writeBuffer(packet.length);
+        return written;
     }
 }

Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java Mon Oct  2 19:23:59 2006
@@ -35,6 +35,12 @@
         buffer.flip();
     }
 
+    public void reset(SocketChannel channel, ByteBuffer buffer) {
+        this.channel = channel;
+        this.buffer = buffer;
+        buffer.flip();
+    }
+
     public int readBuffer(int expectedCount) throws IOException {
         int read = timedRead(expectedCount);
 

Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java?view=auto&rev=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java Mon Oct  2 19:23:59 2006
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.util;
+
+import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+public class BufferChannelWriter {
+
+    private SocketChannel channel = null;
+    private ByteBuffer buffer = null;
+    private long timeout = 10000;
+
+    public BufferChannelWriter(SocketChannel channel, ByteBuffer buffer) {
+        this.channel = channel;
+        this.buffer = buffer;
+    }
+
+    public int writeBuffer(int expectedCount) throws IOException {
+        int read = timedWrite(expectedCount);
+
+        return read;
+    }
+
+    private int timedWrite(int size) throws IOException {
+        int written = internalWrite();
+
+        //If we got the full tomato, then leave
+        if (written == size)
+            return written;
+
+        //Test if anything came inbound or we have more bytes that need reading...
+        if ((written == 0) || (written < size)) {
+
+            //Nope...now need to wait
+            Selector selector = null;
+            SelectionKey key = null;
+            try {
+                Selector.open();
+
+                //Listen in for data on the line
+//                channel.register(selector, SelectionKey.OP_WRITE);
+
+                int retVal = selector.select(timeout);
+                if (retVal == 0) {
+                    //Hmmm...wakeup or timeout and nothing found?
+                    return 0;
+                }
+
+                //retVal must equal 1, because there is only 1 key, so read it
+                int moreWritten = internalWrite();
+                //Error?
+                if (moreWritten < 0)
+                    return moreWritten;
+
+                written += moreWritten;
+
+            } finally {
+                if (key != null) {
+                    key.cancel();
+                }
+                if (selector != null) {
+                    selector.close();
+                }
+            }
+        }
+
+        return written;
+    }
+
+    private int internalWrite() throws IOException {
+
+        int written = 0;
+        int totalBytes = 0;
+
+        do {
+            written = channel.write(buffer);
+            totalBytes += written;
+        } while (written > 0);
+
+        //Test for a closed socket
+        if (written < 0) {
+            //Return with EOF
+            return -1;
+        }
+
+        return totalBytes;
+    }
+}

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
------------------------------------------------------------------------------
    svn:keywords = Date Revision

Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Mon Oct  2 19:23:59 2006
@@ -18,7 +18,12 @@
 
 import net.sf.ehcache.CacheManager;
 import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.util.BufferChannelReader;
 import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
 import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
 import org.apache.geronimo.gcache.server.spi.ThreadPool;
 import org.apache.geronimo.gcache.transports.TransportServer;
@@ -29,8 +34,14 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.Channels;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Arrays;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+import java.io.ByteArrayInputStream;
 
 public class TcpSocketServerTest {
 
@@ -58,10 +69,58 @@
         command.setPayloadFromSession(session);
 
         //Send the packet
-        ByteBuffer magic = ByteBuffer.wrap(Constants.MAGIC);
-        ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket());
-        channel.write(magic);
+        ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket(true));
         channel.write(commandBuffer);
+
+        //Now receive any data (it Should be a MessageAck)
+        ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13);
+        BufferChannelReader bcr = new BufferChannelReader(channel, receiveHeader);
+        int read = bcr.readBuffer(receiveHeader.capacity());
+        assert read == Constants.MAGIC.length + 13;
+        receiveHeader.flip();
+
+                //Read the magic
+        byte magic[] = new byte[Constants.MAGIC.length];
+        receiveHeader.get(magic);
+
+        //Better match the Magic
+        assert Arrays.equals(Constants.MAGIC, magic);
+
+        //Get the command
+        byte commandIdentifier = receiveHeader.get();
+
+        //Get the checksum
+        long checksum = receiveHeader.getLong();
+
+        //Get the command length
+        int length = receiveHeader.getInt();
+
+        commandBuffer = ByteBuffer.allocate(length);
+        bcr.reset(channel, commandBuffer);
+        int count = bcr.readBuffer(length);
+        assert count == length;
+
+        byte commandArray[] = commandBuffer.array();
+        Checksum calcChecksum = new CRC32();
+        calcChecksum.update(commandArray, 0, commandArray.length);
+        long newCheck  = calcChecksum.getValue();
+
+        //Checksums match
+        assert newCheck == checksum;
+
+        //Now create the command
+        ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
+        ReadableByteChannel readChannel = Channels.newChannel(bias);
+
+        //Create the command and unmarshal the data
+        Command ackCommand = CommandTypes.createCommand(commandIdentifier);
+        ackCommand.readExternal(readChannel);
+
+        //Is the message the type we think it is?
+        assert ackCommand instanceof MessageAckCommand;
+
+        //Is the ack for the original message?
+        assert command.getCommandId() == ((MessageAckCommand)ackCommand).getMessageId();
 
     }