You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/09/28 09:20:23 UTC
svn commit: r450728 - in /geronimo/sandbox/gcache/server/src:
main/java/org/apache/geronimo/gcache/
main/java/org/apache/geronimo/gcache/command/
main/java/org/apache/geronimo/gcache/transports/
main/java/org/apache/geronimo/gcache/transports/tcp/ main...
Author: jgenender
Date: Thu Sep 28 00:20:20 2006
New Revision: 450728
URL: http://svn.apache.org/viewvc?view=rev&rev=450728
Log:
Begin communicating with the cache through the TCPTransport
Added:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java (with props)
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java (contents, props changed)
- copied, changed from r448152, geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java (with props)
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java (with props)
Removed:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java
Modified:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java Thu Sep 28 00:20:20 2006
@@ -24,8 +24,8 @@
import org.apache.geronimo.gcache.server.listeners.DefaultCacheNotifier;
public class CacheInfoHolder {
- public final CacheManager cacheManager;
- public CacheNotifier cacheNotifier = null;
+ private final CacheManager cacheManager;
+ private CacheNotifier cacheNotifier = null;
public CacheInfoHolder(CacheManager cacheManager) {
this.cacheManager = cacheManager;
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/ThreadSupport.java Thu Sep 28 00:20:20 2006
@@ -37,8 +37,8 @@
execute();
}
- running = false;
cleanUp();
+ running = false;
}
protected abstract void initialize();
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java Thu Sep 28 00:20:20 2006
@@ -18,25 +18,32 @@
*/
package org.apache.geronimo.gcache.command;
-import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.transports.tcp.Constants;
import org.apache.geronimo.gcache.util.ByteArrayInputStream;
+import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
import org.apache.geronimo.gcache.util.UniqueId;
-import org.apache.geronimo.gcache.marshal.MarshalAware;
-import org.apache.geronimo.gcache.CacheInfoHolder;
import java.io.IOException;
-import java.io.Serializable;
-import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.nio.channels.Channels;
import java.nio.charset.Charset;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
public class BaseCommand implements Command {
private long commandId = 0;
+ private Checksum checksum = null;
+
+ public byte getCommandType() throws IOException {
+ throw new IOException("Invalid command type");
+ }
public long getCommandId() {
if (commandId == 0){
@@ -67,6 +74,45 @@
commandId = UniqueId.get();
}
writeLong(channel, commandId);
+ }
+
+ public byte[] createPacket() throws IOException{
+ // COMMAND TYPE - 1 byte
+ // CHECKSUM - 8 bytes
+ // COMMAND LENGTH - 4 bytes
+ // COMMAND - ? bytes as defined above/
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
+ WritableByteChannel channel = Channels.newChannel(baos);
+
+ //Write the command type
+ baos.write(getCommandType());
+
+ //Place hold the checksum
+ baos.write(new byte[] {0,0,0,0,0,0,0,0});
+
+ //Place hold the length
+ baos.write(new byte[] {0,0,0,0});
+
+ //Marshal the command
+ writeExternal(channel);
+ channel.close();
+
+ int commandStart = 13;
+
+ //Get the entire command
+ byte command[] = baos.toByteArray();
+
+ int len = command.length - commandStart;
+
+ Checksum checksum = new CRC32();
+ checksum.update(command, commandStart, len);
+
+ //Write the checksum
+ ByteBuffer.wrap(command, 1, 8).putLong(checksum.getValue());
+
+ //Write the length of the command
+ ByteBuffer.wrap(command, 9, 4).putInt(len);
+ return command;
}
public byte[] marshal() throws IOException {
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java Thu Sep 28 00:20:20 2006
@@ -28,6 +28,10 @@
private int numberOfCommands;
+ public byte getCommandType() throws IOException {
+ return CommandTypes.BULK_SEND_COMMAND;
+ }
+
public int getNumberOfCommands() {
return numberOfCommands;
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java Thu Sep 28 00:20:20 2006
@@ -84,7 +84,7 @@
//Process what we want read
cacheName = readString(channel);
- key = this.readBytes(channel);
+ key = readBytes(channel);
sessionId = readString(channel);
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/ClearCacheCommand.java Thu Sep 28 00:20:20 2006
@@ -25,6 +25,10 @@
public class ClearCacheCommand extends CacheBaseCommand {
+ public byte getCommandType() throws IOException {
+ return CommandTypes.CLEARCACHE_COMMAND;
+ }
+
public void execute(CacheInfoHolder info) throws IOException {
Cache cache = info.getCache(getCacheName(), true);
cache.removeAll();
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/Command.java Thu Sep 28 00:20:20 2006
@@ -28,7 +28,9 @@
* invoked upon reciept.
*/
public interface Command extends MarshalAware {
- long getCommandId();
+ public long getCommandId();
- void execute(CacheInfoHolder info) throws IOException;
+ public byte getCommandType() throws IOException;
+
+ public void execute(CacheInfoHolder info) throws IOException;
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CommandTypes.java Thu Sep 28 00:20:20 2006
@@ -1,13 +1,14 @@
package org.apache.geronimo.gcache.command;
public class CommandTypes {
- public static final int PUT_ENTRY_COMMAND = 1;
- public static final int REMOVE_ENTRY_COMMAND = 2;
- public static final int CLEARCACHE_COMMAND = 3;
- public static final int MESSAGE_ACK_COMMAND = 4;
- public static final int BULK_SEND_COMMAND = 5;
- public static final int REMOVE_SESSION_COMMAND = 6;
- public static final int PUT_SESSION_COMMAND = 7;
+ public static final byte PUT_ENTRY_COMMAND = 1;
+ public static final byte REMOVE_ENTRY_COMMAND = 2;
+ public static final byte CLEARCACHE_COMMAND = 3;
+ public static final byte MESSAGE_ACK_COMMAND = 4;
+ public static final byte BULK_SEND_COMMAND = 5;
+ public static final byte REMOVE_SESSION_COMMAND = 6;
+ public static final byte PUT_SESSION_COMMAND = 7;
+ public static final byte GET_CACHE_COMMAND = 8;
public static Command createCommand(int identifier) {
Command command = null;
@@ -32,6 +33,9 @@
break;
case(PUT_SESSION_COMMAND):
command = new PutSessionCommand();
+ break;
+ case(GET_CACHE_COMMAND):
+ command = new GetCacheCommand();
break;
}
return command;
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/GetCacheCommand.java Thu Sep 28 00:20:20 2006
@@ -25,15 +25,19 @@
import net.sf.ehcache.Cache;
+/**
+ * This class represents a request to obtain a copy of an
+ * entire cache. The sender should set the cache name, and
+ * the receiver will acknowledge this by sending a BulkSendCommand
+ * back to the client, followed by PutSessionCommands (for sessions)
+ * and PutEntryCommands for non-Session data.
+ */
public class GetCacheCommand extends CacheBaseCommand{
- public void execute(CacheInfoHolder info) throws IOException {
- //TODO - Do we lock the Cache???
- Cache cache = info.getCache(getCacheName(), true);
-
- List list = cache.getKeys();
-
- int size = list.size();
+ public byte getCommandType() throws IOException {
+ return CommandTypes.GET_CACHE_COMMAND;
+ }
+ public void execute(CacheInfoHolder info) throws IOException {
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java Thu Sep 28 00:20:20 2006
@@ -28,6 +28,10 @@
private long messageId = 0;
+ public byte getCommandType() throws IOException {
+ return CommandTypes.MESSAGE_ACK_COMMAND;
+ }
+
public long getMessageId() {
return messageId;
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutEntryCommand.java Thu Sep 28 00:20:20 2006
@@ -30,6 +30,10 @@
public class PutEntryCommand extends CachePayloadBaseCommand {
+ public byte getCommandType() throws IOException {
+ return CommandTypes.PUT_ENTRY_COMMAND;
+ }
+
public void execute(CacheInfoHolder info) throws IOException {
Cache cache = info.getCache(getCacheName(), true);
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java Thu Sep 28 00:20:20 2006
@@ -40,6 +40,10 @@
private int entryCount;
+ public byte getCommandType() throws IOException {
+ return CommandTypes.PUT_SESSION_COMMAND;
+ }
+
public void readExternal(ReadableByteChannel channel) throws IOException {
super.readExternal(channel);
entryCount = readInt(channel);
@@ -264,6 +268,6 @@
cache.put(new Element(getSessionId(), getRawSessionFromPayload()));
//Notify peers
- info.cacheNotifier.notifyPutSession(this);
+ info.getCacheNotifier().notifyPutSession(this);
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveEntryCommand.java Thu Sep 28 00:20:20 2006
@@ -30,6 +30,11 @@
import net.sf.ehcache.Cache;
public class RemoveEntryCommand extends CacheBaseCommand{
+
+ public byte getCommandType() throws IOException {
+ return CommandTypes.REMOVE_ENTRY_COMMAND;
+ }
+
public void execute(CacheInfoHolder info) throws IOException {
Cache cache = info.getCache(getCacheName(), true);
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/RemoveSessionCommand.java Thu Sep 28 00:20:20 2006
@@ -25,6 +25,10 @@
public class RemoveSessionCommand extends CacheBaseCommand{
+ public byte getCommandType() throws IOException {
+ return CommandTypes.REMOVE_SESSION_COMMAND;
+ }
+
public void execute(CacheInfoHolder info) throws IOException {
Cache cache = info.getCache(getCacheName(), true);
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java?view=auto&rev=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java Thu Sep 28 00:20:20 2006
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.transports;
+
+import java.io.IOException;
+
+public interface TransportServer {
+
+ public void start() throws IOException;
+ public void stop() throws IOException;
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/TransportServer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java Thu Sep 28 00:20:20 2006
@@ -18,57 +18,108 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.util.BufferChannelInputStream;
+import org.apache.geronimo.gcache.util.BufferChannelReader;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.geronimo.gcache.command.BaseCommand;
+
import java.io.IOException;
+import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.Channels;
import java.util.Arrays;
-
-import org.apache.geronimo.gcache.command.Command;
-import org.apache.geronimo.gcache.command.CommandTypes;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
public class DefaultSelectionKeyProcessor implements SelectionKeyProcessor {
- private final static int HEADER_SIZE = Constants.MAGIC.length + 4;
+ private final static int HEADER_SIZE = Constants.MAGIC.length + 1 + 8 + 4;
+ private CacheInfoHolder infoHolder = null;
+
+ public DefaultSelectionKeyProcessor(CacheInfoHolder infoHolder) {
+ this.infoHolder = infoHolder;
+ }
public void process(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
- // first byte is the identifier, the next 4 are the checksum
- ByteBuffer buffer = ByteBuffer.allocateDirect(HEADER_SIZE);
- int count;
- while ((count = channel.read(buffer)) > 0) {
- //Better at least have the size of the HEADER
- if (count < HEADER_SIZE){
- disconnect(channel);
- return;
- }
-
- buffer.flip();
-
- byte magic[] = new byte[Constants.MAGIC.length];
- buffer.get(magic);
-
- //Better match the Magic
- if (!Arrays.equals(Constants.MAGIC, magic)){
- disconnect(channel);
- return;
- }
-
- int commandIdentifier = buffer.getInt();
- Command command = CommandTypes.createCommand(commandIdentifier);
- command.readExternal(channel);
- //TODO execute the command
-// command.execute();
+ // HEADER BREAKDOWN
+ // ---------------------------------------------------------
+ // MAGIC HEADER - 6 bytes
+ // COMMAND - 1 byte
+ // CHECKSUM - 8 bytes
+ // COMMAND LENGTH - 4 bytes
+ // COMMAND - ? bytes as defined above
+ ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE);
+
+ BufferChannelReader bcr = new BufferChannelReader(channel, buffer);
+ int count = bcr.readBuffer(HEADER_SIZE);
+ if (count < HEADER_SIZE) {
+ //Nope...we are outta here
+ disconnect(channel);
+ return;
+ }
+ buffer.flip();
+
+ //Read the magic
+ byte magic[] = new byte[Constants.MAGIC.length];
+ buffer.get(magic);
+
+ //Better match the Magic
+ if (!Arrays.equals(Constants.MAGIC, magic)) {
+ //Magic doesn't match, so close the socket
+ disconnect(channel);
+ return;
}
- // Test for closed connection
+ //Get the command
+ byte commandIdentifier = buffer.get();
+
+ //Get the checksum
+ long checksum = buffer.getLong();
+
+ //Get the command length
+ int length = buffer.getInt();
+
+ //TODO - This really should be done with a ByteBuffer pool
+ ByteBuffer commandBuffer = ByteBuffer.allocate(length);
+ count = channel.read(commandBuffer);
+ if (count < length) {
+ //Command was bad
+ //TODO - Send back a resend?
+
+ //Exit out of here
+// break;
+ }
+
+ byte commandArray[] = commandBuffer.array();
+ Checksum calcChecksum = new CRC32();
+ calcChecksum.update(commandArray, 0, commandArray.length);
+ long newCheck = calcChecksum.getValue();
+
+ ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
+ ReadableByteChannel readChannel = Channels.newChannel(bias);
+
+ //Create the command and unmarshal the data
+ Command command = CommandTypes.createCommand(commandIdentifier);
+ command.readExternal(readChannel);
+
+ //Do something with the data
+ command.execute(infoHolder);
+
+ // Test for closed connection on the client side
if (count < 0) {
channel.close();
+ //TODO - remove the peer
}
}
- private void disconnect(SocketChannel channel) throws IOException{
+ private void disconnect(SocketChannel channel) throws IOException {
try {
channel.socket().shutdownOutput();
} catch (IOException e) {
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java Thu Sep 28 00:20:20 2006
@@ -38,17 +38,10 @@
private Selector selector = null;
private int timeOut = 0;
- public TCPSocketHandler(int timeOut) throws IOException{
- this(timeOut, null);
- }
-
public TCPSocketHandler(int timeOut, SelectionKeyProcessor processor) throws IOException{
this.timeOut = timeOut;
- if (processor == null)
- this.processor = new DefaultSelectionKeyProcessor();
- else
- this.processor = processor;
+ this.processor = processor;
selector = Selector.open();
}
@@ -91,10 +84,23 @@
processor.process(key);
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
- } catch (InterruptedException e) {
- log.error("InterruptedException occured", e);
+ }
+ }
+
+ public void halt() {
+ super.halt();
+
+ //Wake up the selector if its in a wait state
+ selector.wakeup();
+
+ //Wait until everything is shutdown
+ while (isRunning()){
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
}
}
Copied: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java (from r448152, geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java)
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java?view=diff&rev=450728&p1=geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java&r1=448152&p2=geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServer.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java Thu Sep 28 00:20:20 2006
@@ -17,6 +17,7 @@
package org.apache.geronimo.gcache.transports.tcp;
import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.transports.TransportServer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SelectionKey;
@@ -24,7 +25,7 @@
import java.net.ServerSocket;
import java.io.IOException;
-public class TCPSocketServer {
+public class TCPSocketTransportServer implements TransportServer {
private ServerSocketChannel server = null;
private InetSocketAddress inet = null;
@@ -33,11 +34,7 @@
private int timeOut = 0;
private SelectionKeyProcessor processor;
- public TCPSocketServer(String address, int port, ThreadPool threadPool, int timeOut) {
- this(address, port, threadPool, timeOut, null);
- }
-
- public TCPSocketServer(String address, int port, ThreadPool threadPool, int timeOut, SelectionKeyProcessor processor) {
+ public TCPSocketTransportServer(String address, int port, ThreadPool threadPool, int timeOut, SelectionKeyProcessor processor) {
inet = new InetSocketAddress(address, port);
pool = threadPool;
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java?view=auto&rev=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java Thu Sep 28 00:20:20 2006
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.util;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.Channel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+
+/**
+ * Provides an input stream to read a socket channel and fill a buffer
+ */
+public class BufferChannelInputStream extends InputStream {
+
+ private SocketChannel channel = null;
+ private ByteBuffer buffer = null;
+ private long timeout = 10000;
+
+ public BufferChannelInputStream(SocketChannel channel, ByteBuffer buffer) {
+ this.channel = channel;
+ this.buffer = buffer;
+ buffer.flip();
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * Read's next byte of data
+ *
+ * @return next byte of data or -1 if it cannot read
+ * @throws IOException
+ */
+ public int read() throws IOException {
+
+ //Have we read everything?
+ if (!buffer.hasRemaining()) {
+ //Yep...so refill the buffer
+ if (timedRead() <= 0)
+ return -1;
+ }
+
+ return (int) (buffer.get() & 0xff);
+ }
+
+ public long readLong() throws IOException {
+ //Have we read everything?
+ if (!buffer.hasRemaining()) {
+ //Yep...so refill the buffer
+ if (timedRead() <= 0)
+ return -1;
+ }
+
+ if (buffer.remaining() >= 8){
+ return buffer.getLong();
+ } else {
+ throw new BufferUnderflowException();
+ }
+ }
+
+ public int readInt() throws IOException {
+ //Have we read everything?
+ if (!buffer.hasRemaining()) {
+ //Yep...so refill the buffer
+ if (timedRead() <= 0)
+ return -1;
+ }
+
+ if (buffer.remaining() >= 4){
+ return buffer.getInt();
+ } else {
+ throw new BufferUnderflowException();
+ }
+ }
+
+ public int read(byte[] bytes) throws IOException {
+ return this.read(bytes, 0, bytes.length);
+ }
+
+ public int read(byte[] bytes, int offset, int len) throws IOException {
+ //Have we read everything?
+ if (!buffer.hasRemaining()) {
+ //Yep...so refill the buffer
+ if (timedRead() <= 0)
+ return -1;
+ }
+
+ if (len > buffer.remaining()) {
+ len = buffer.remaining();
+ }
+ buffer.get(bytes, offset, len);
+
+ return (len);
+
+ }
+
+ private int timedRead() throws IOException {
+ //Reset the buffer
+ buffer.clear();
+ int read = internalRead();
+
+ //Test if anything came inbound...
+ if (read == 0) {
+ //Nope...now need to wait
+ Selector selector = null;
+ SelectionKey key = null;
+ try{
+ Selector.open();
+
+ //Listen in for data on the line
+ channel.register(selector, SelectionKey.OP_READ);
+
+ int retVal = selector.select(timeout);
+ if (retVal == 0) {
+ //Hmmm...wakeup or timeout and nothing found?
+ return 0;
+ }
+
+ //retVal must equal 1, because there is only 1 key, so read it
+ read = internalRead();
+ }finally{
+ if (key != null){
+ key.cancel();
+ }
+ if (selector != null){
+ selector.close();
+ }
+ }
+ }
+
+ buffer.flip();
+ return read;
+ }
+
+ private int internalRead() throws IOException {
+
+ int read = 0;
+ int totalBytes = 0;
+
+ do {
+ read = channel.read(buffer);
+ totalBytes += read;
+ } while (read > 0);
+
+ //Test for a closed socket
+ if (read < 0) {
+ //Return with EOF
+ return -1;
+ }
+
+ return totalBytes;
+ }
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelInputStream.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java?view=auto&rev=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java Thu Sep 28 00:20:20 2006
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.util;
+
+import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+public class BufferChannelReader {
+ private SocketChannel channel = null;
+ private ByteBuffer buffer = null;
+ private long timeout = 10000;
+
+ public BufferChannelReader(SocketChannel channel, ByteBuffer buffer) {
+ this.channel = channel;
+ this.buffer = buffer;
+ buffer.flip();
+ }
+
+ public int readBuffer(int expectedCount) throws IOException {
+ int read = timedRead(expectedCount);
+
+ return read;
+ }
+
+ private int timedRead(int size) throws IOException {
+ //Reset the buffer
+ buffer.clear();
+ int read = internalRead();
+
+ //If we got the full tomato, then leave
+ if (read == size)
+ return read;
+
+ //Test if anything came inbound or we have more bytes that need reading...
+ if ((read == 0) || (read < size)) {
+
+ //Nope...now need to wait
+ Selector selector = null;
+ SelectionKey key = null;
+ try {
+ Selector.open();
+
+ //Listen in for data on the line
+ channel.register(selector, SelectionKey.OP_READ);
+
+ int retVal = selector.select(timeout);
+ if (retVal == 0) {
+ //Hmmm...wakeup or timeout and nothing found?
+ return 0;
+ }
+
+ //retVal must equal 1, because there is only 1 key, so read it
+ int moreRead = internalRead();
+ //Error?
+ if (moreRead < 0)
+ return moreRead;
+
+ read += moreRead;
+
+ } finally {
+ if (key != null) {
+ key.cancel();
+ }
+ if (selector != null) {
+ selector.close();
+ }
+ }
+ }
+
+ return read;
+ }
+
+ private int internalRead() throws IOException {
+
+ int read = 0;
+ int totalBytes = 0;
+
+ do {
+ read = channel.read(buffer);
+ totalBytes += read;
+ } while (read > 0);
+
+ //Test for a closed socket
+ if (read < 0) {
+ //Return with EOF
+ return -1;
+ }
+
+ return totalBytes;
+ }
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java?view=diff&rev=450728&r1=450727&r2=450728
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Thu Sep 28 00:20:20 2006
@@ -16,24 +16,70 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
-import org.testng.annotations.Test;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import net.sf.ehcache.CacheManager;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
+import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.transports.TransportServer;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
public class TcpSocketServerTest {
-
+
+ private static final int port = 45678;
+ private static final String host = "localhost";
+
+ TransportServer server = null;
+
@Test()
- public void runServer() throws Exception {
+ public void sendSession() throws Exception {
- /**
- ThreadPool pool = new DefaultThreadPoolImpl(10);
- MockSelectionKeyProcessor mock = new MockSelectionKeyProcessor();
- //TCPSocketServer server = new TCPSocketServer("localhost", 45678, pool, 2000, mock);
- TCPSocketServer server = new TCPSocketServer("localhost", 45678, pool, 2000);
- server.start();
- Thread.sleep(100000);
- server.stop();
- **/
+ //Create a client socket
+ SocketChannel channel = SocketChannel.open();
+ channel.connect(new InetSocketAddress(host, port));
+
+ //Create a session
+ Map session = new HashMap();
+ session.put("key1","data1");
+ session.put("key2","data2");
+ session.put("key3","data3");
+ PutSessionCommand command = new PutSessionCommand();
+ command.setCacheName("Cache1");
+ command.setSessionId("Session1");
+ command.setPayloadFromSession(session);
+
+ //Send the packet
+ ByteBuffer magic = ByteBuffer.wrap(Constants.MAGIC);
+ ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket());
+ channel.write(magic);
+ channel.write(commandBuffer);
}
+
+
+ @BeforeSuite
+ public void setUp() throws Exception{
+ ThreadPool pool = new DefaultThreadPoolImpl(10);
+ CacheManager mgr = CacheManager.create();
+ CacheInfoHolder info = new CacheInfoHolder(mgr);
+ DefaultSelectionKeyProcessor kp = new DefaultSelectionKeyProcessor(info);
+
+ server = new TCPSocketTransportServer(host, port, pool, 2000, kp);
+
+ server.start();
+ }
+
+ @AfterSuite(alwaysRun=true)
+ public void shutdown() throws Exception{
+ server.stop();
+ }
+
}