You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/19 18:12:53 UTC
svn commit: r465670 [1/2] - in /geronimo/sandbox/gcache/server: ./
src/main/java/org/apache/geronimo/gcache/command/
src/main/java/org/apache/geronimo/gcache/marshal/
src/main/java/org/apache/geronimo/gcache/server/
src/main/java/org/apache/geronimo/gc...
Author: jgenender
Date: Thu Oct 19 09:12:51 2006
New Revision: 465670
URL: http://svn.apache.org/viewvc?view=rev&rev=465670
Log:
Add MINA
Added:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java (with props)
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java (with props)
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java (with props)
Removed:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/impl/
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/listeners/RegisteredListeners.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/spi/
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessorFactory.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/SelectionKeyProcessor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/SelectionKeyProcessorFactory.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/MockSelectionKeyProcessor.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/MockSelectionKeyProcessorFactory.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketServerTest.java
Modified:
geronimo/sandbox/gcache/server/pom.xml
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/marshal/MarshalAware.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheServer.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/config/Configuration.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/core/GCacheThread.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/BulkSendCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CacheBaseCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/MessageAckCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/PutSessionCommandTest.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
Modified: geronimo/sandbox/gcache/server/pom.xml
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/pom.xml?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/pom.xml (original)
+++ geronimo/sandbox/gcache/server/pom.xml Thu Oct 19 09:12:51 2006
@@ -30,6 +30,13 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>nlog4j</artifactId>
+ <version>1.2.25</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.testng</groupId>
@@ -46,6 +53,11 @@
<dependency>
<groupId>org.apache.geronimo.gcache</groupId>
<artifactId>openwire</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-core</artifactId>
</dependency>
</dependencies>
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java Thu Oct 19 09:12:51 2006
@@ -22,13 +22,10 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.nio.charset.Charset;
-import java.util.zip.CRC32;
-import java.util.zip.Checksum;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
import org.apache.geronimo.gcache.marshal.MarshalAware;
import org.apache.geronimo.gcache.transports.CommandVisitor;
@@ -36,6 +33,8 @@
import org.apache.geronimo.gcache.util.ByteArrayInputStream;
import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
import org.apache.geronimo.gcache.util.UniqueId;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.util.CharsetUtil;
public class BaseCommand implements Command {
@@ -46,332 +45,264 @@
private long commandId = 0;
/**
- * Get the command type. Types are integers and are used by gcache to create
- * a new instance of the correct command when the command arrives at its destination.
+ * Get the command type. Types are integers and are used by gcache to
+ * create a new instance of the correct command when the command arrives
+ * at its destination.
*/
public byte getCommandType() throws IOException {
- throw new IOException("Invalid command type - subclasses must over ride getCommandType");
+ throw new IOException(
+ "Invalid command type - subclasses must over ride getCommandType");
}
/**
* The command id is used to uniquely identify this command and is
- * specifically used in the case where a response is required.
- * If the command id is still set to the default 0 a unique id is generated by the
- * UniqueId class.
+ * specifically used in the case where a response is required. If the
+ * command id is still set to the default 0 a unique id is generated by
+ * the UniqueId class.
+ *
* @see org.apache.geronimo.gcache.util.UniqueId
*/
public long getCommandId() {
- if (commandId == 0){
- commandId = UniqueId.get();
- }
- return commandId;
+ if (commandId == 0) {
+ commandId = UniqueId.get();
+ }
+ return commandId;
}
/**
- * This method is provided for the cases where a specfic id is required to
- * coordinate with another (external) framework. Typical use will not
+ * This method is provided for the cases where a specfic id is required
+ * to coordinate with another (external) framework. Typical use will not
* require this method to be called, the default mechanism will suffice.
- *
+ *
* @param commandId
*/
public void setCommandId(long commandId) {
- //TODO - should this method be here. I am leaning towards this being a read only property.
- this.commandId = commandId;
+ // TODO - should this method be here. I am leaning towards this being a
+ // read only property.
+ this.commandId = commandId;
}
-
/**
* This default implementation does nothing, subclasses will perform
* whatever task this command is supposed to do.
*/
- public void execute(CommandVisitor visitor) throws IOException{
- // nothing to do in the base
+ public void execute(CommandVisitor visitor) throws IOException {
+ // nothing to do in the base
}
/**
* @see org.apache.geronimo.gcache.marshal.MarshalAware.readExternal(ReadableByteChannel)
*/
- public void readExternal(ReadableByteChannel channel) throws IOException {
- // this is the root so no super impl, others should call super first
- commandId = readLong(channel);
-
+ public void readExternal(ByteBuffer buffer) throws IOException {
+ // this is the root so no super impl, others should call super first
+ commandId = buffer.getLong();
}
/**
* @see org.apache.geronimo.gcache.marshal.MarshalAware.writeExternal(WritableByteChannel)
*/
- public void writeExternal(WritableByteChannel channel) throws IOException {
- // this is the root so no super impl, others should call super first
+ public void writeExternal(ByteBuffer buffer) throws IOException {
+ // this is the root so no super impl, others should call super first
- if (commandId == 0){
- commandId = UniqueId.get();
- }
- writeLong(channel, commandId);
+ if (commandId == 0) {
+ commandId = UniqueId.get();
+ }
+
+ buffer.putLong(commandId);
}
/**
- * Create a byte array to hold the data for this command. Header information
- * (the first 13 bytes) are written first The first byte is the command
- * type, the second 8 bytes are a checksum, the next 4 bytes are the command
- * length. The remainder of the bytes will be the result of the
- * writeExternal implementation of the command.
- *
+ * Create a byte array to hold the data for this command. Header
+ * information (the first 13 bytes) are written first The first byte is
+ * the command type, the second 8 bytes are a checksum, the next 4 bytes
+ * are the command length. The remainder of the bytes will be the result
+ * of the writeExternal implementation of the command.
+ *
* @return
* @throws IOException
*/
- public byte[] createPacket(boolean includeMagic) throws IOException{
- // COMMAND TYPE - 1 byte
- // CHECKSUM - 8 bytes
- // COMMAND LENGTH - 4 bytes
- // COMMAND - ? bytes as defined above/
- int startFrom = includeMagic ? Constants.MAGIC.length : 0;
+ public byte[] createPacket(boolean includeMagic) throws IOException {
+
+ ByteBuffer buffer = null;
- ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
- WritableByteChannel channel = Channels.newChannel(baos);
+ try {
+ buffer = ByteBuffer.allocate(256, false);
+ buffer.setAutoExpand(true);
+
+ // COMMAND TYPE - 1 byte
+ // COMMAND LENGTH - 4 bytes
+ // COMMAND - ? bytes as defined above/
+ int startFrom = includeMagic ? Constants.MAGIC.length : 0;
+
+ // Write magic if requested
+ if (includeMagic)
+ buffer.put(Constants.MAGIC);
+
+ // Write the command type
+ buffer.put(getCommandType());
+
+ // Mark the length position
+ buffer.mark();
+
+ // Place hold the length
+ buffer.putInt(0);
+
+ // Marshal the command
+ writeExternal(buffer);
+
+ int commandStart = startFrom + 5;
+
+ int pos = buffer.position();
+ int len = pos - commandStart;
+
+ buffer.reset();
+ // Write the length of the command
+ buffer.putInt(len);
+ buffer.position(pos);
+ buffer.limit(pos);
+
+ buffer.flip();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ WritableByteChannel channel = Channels.newChannel(baos);
+ channel.write(buffer.buf());
+ return baos.toByteArray();
+ } finally {
+ if (buffer != null)
+ buffer.release();
- //Write magic if requested
- baos.write(Constants.MAGIC);
+ }
+ }
+
+ public byte[] marshal() throws IOException {
+ ByteBuffer buffer = null;
+ try {
+ buffer = ByteBuffer.allocate(4096, true);
+ buffer.setAutoExpand(true);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
+ WritableByteChannel channel = Channels.newChannel(baos);
+ writeExternal(buffer);
+
+ buffer.limit(buffer.position());
+ buffer.flip();
+
+ channel.write(buffer.buf());
+ channel.close();
+
+ return baos.toByteArray();
+ } finally {
- //Write the command type
- baos.write(getCommandType());
+ }
+ }
- //Place hold the checksum
- baos.write(new byte[] {0,0,0,0,0,0,0,0});
+ /**
+ * Rebuld an object from an array of bytes.
+ *
+ * @param data
+ * @return
+ * @throws IOException
+ */
+ // TODO - this method hoses MarshalAware - IMO this is not the place for
+ // this method, its a utility method
+ public static Object convertObjectFromBytes(byte data[]) throws IOException {
+ if (data == null)
+ return null;
- //Place hold the length
- baos.write(new byte[] {0,0,0,0});
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ try {
+ return ois.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
- //Marshal the command
- writeExternal(channel);
- //TODO error handeling
- channel.close();
+ protected String readString(ByteBuffer buffer) throws IOException {
- int commandStart = startFrom + 13;
+ CharsetDecoder decoder = CharsetUtil.getDefaultCharset().newDecoder();
+ byte nullFlag = buffer.get();
+ if (nullFlag == 0)
+ return null;
- //Get the entire command
- byte command[] = baos.toByteArray();
+ String str = buffer.getString(decoder);
- int len = command.length - commandStart;
+ // String str = Charset.defaultCharset().decode(buffer).toString();
- Checksum checksum = new CRC32();
- checksum.update(command, commandStart, len);
+ return str;
+ }
+
+ protected void writeString(ByteBuffer buffer, String str)
+ throws IOException {
+
+ if (str == null){
+ buffer.put((byte)0);
+ return;
+ } else {
+ buffer.put((byte)1);
+ }
- //Write the checksum
- ByteBuffer.wrap(command, startFrom + 1, 8).putLong(checksum.getValue());
+ CharsetEncoder encoder = CharsetUtil.getDefaultCharset().newEncoder();
+ buffer.putString(str, encoder);
+
+ //Null terminate
+ buffer.put((byte)0);
- //Write the length of the command
- ByteBuffer.wrap(command, startFrom + 9, 4).putInt(len);
- return command;
}
+
+ protected byte[] readBytes(ByteBuffer buffer) {
- public byte[] marshal() throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
- WritableByteChannel channel = Channels.newChannel(baos);
- writeExternal(channel);
- channel.close();
- return baos.toByteArray();
+ // Read size
+ int byteCount = buffer.getInt();
+
+ if (byteCount == -1)
+ return null;
+
+ byte bytes[] = new byte[byteCount];
+ buffer.get(bytes);
+
+ return bytes;
}
- /**
- * Rebuld an object from an array of bytes.
- * @param data
- * @return
- * @throws IOException
- */
- //TODO - this method hoses MarshalAware - IMO this is not the place for this method, its a utility method
- public static Object convertObjectFromBytes(byte data[]) throws IOException {
- if (data == null)
- return null;
+ protected void writeBytes(ByteBuffer buffer, byte[] bytes)
+ throws IOException {
+
+ if (bytes == null){
+ buffer.putInt(-1);
+ return;
+ }
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- ObjectInputStream ois = new ObjectInputStream(bais);
- try {
- return ois.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException(e.getMessage());
- }
- }
-
- protected int readInt(ReadableByteChannel channel) throws IOException {
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- int read = channel.read(buffer);
- int intValue = 0;
- if (read == 4) {
- buffer.flip();
- intValue = buffer.getInt();
- } else {
- throw new IOException(
- "Could not read an int from the channel - not enough data -"
- + " expected length = 4, actual length = " + read);
- }
-
- return intValue;
- }
-
- protected void writeInt(WritableByteChannel channel, int intValue) throws IOException {
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- buffer.putInt(intValue);
-
- buffer.flip();
- channel.write(buffer);
-
- }
-
- protected long readLong(ReadableByteChannel channel) throws IOException {
- ByteBuffer buffer = ByteBuffer.allocateDirect(8);
- int read = channel.read(buffer);
- long longValue = 0;
- if (read == 8) {
- buffer.flip();
- longValue = buffer.getLong();
- } else {
- throw new IOException(
- "Could not read a long from the channel - not enough data -"
- + " expected length = 8, actual length = " + read);
- }
-
- return longValue;
- }
-
- protected void writeLong(WritableByteChannel channel, long longValue) throws IOException {
-
- // first the length of the string
- ByteBuffer buffer = ByteBuffer.allocateDirect(8);
- buffer.putLong(longValue);
-
- buffer.flip();
- channel.write(buffer);
-
- }
-
- protected String readString(ReadableByteChannel channel) throws IOException {
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- int read = channel.read(buffer);
- int stringLength = 0;
- if (read == 4) {
- buffer.flip();
- stringLength = buffer.getInt();
- } else {
- throw new IOException(
- "Could not read the string length from the channel - not enough data -"
- + " expected length = 4, actual length = " + read);
- }
- if (stringLength == -1)
- return null;
-
- buffer = null;
- // should read stringLength bytes from the channel to get the string
- buffer = ByteBuffer.allocateDirect(stringLength);
- read = channel.read(buffer);
- if (read != stringLength) {
- throw new IOException(
- "Could not read the string from the channel - expected "
- + stringLength + " bytes but got " + read + ".");
- }
- buffer.flip();
- //TODO - not sure which character set to use so going with default but that will
- // not work in case where two machines have different charsets as default so
- // this needs to be revisited at some point
- String str = Charset.defaultCharset().decode(buffer).toString();
- buffer = null;
-
- return str;
- }
-
- protected void writeString(WritableByteChannel channel, String str) throws IOException {
-
- // first the length of the string
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- if (str != null) {
- buffer.putInt(str.length());
- } else {
- buffer.putInt(-1);
- }
-
- buffer.flip();
- channel.write(buffer);
-
- // now write the string - buffer comes back flipped and ready to be written to the channel
- if (str != null){
- buffer = Charset.defaultCharset().encode(str);
- channel.write(buffer);
- }
- }
-
- protected byte[] readBytes(ReadableByteChannel channel) throws IOException {
-
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
-
- //Read size
- int read = channel.read(buffer);
- int byteCount = 0;
- if (read == 4) {
- buffer.flip();
- byteCount = buffer.getInt();
- } else {
- throw new IOException(
- "Could not read bytes length from the channel - "
- + " expected 4 bytes but got " + read + ".");
- }
- buffer = null;
-
- if (byteCount == -1)
- return null;
-
- //Read the bytes
- buffer = ByteBuffer.allocate(byteCount);
- read = channel.read(buffer);
- if (read != byteCount) {
- throw new IOException(
- "Could not read bytes from the channel - expected "
- + byteCount + " bytes but got " + read + ".");
- }
- buffer.flip();
- return buffer.array();
- }
-
- protected void writeBytes(WritableByteChannel channel, byte[] bytes) throws IOException {
-
- // first the length of the string
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- if (bytes != null)
- buffer.putInt(bytes.length);
- else{
- buffer.putInt(-1);
- }
- buffer.flip();
- channel.write(buffer);
+ buffer.putInt(bytes.length);
+ buffer.put(bytes);
- if (bytes != null)
- channel.write(ByteBuffer.wrap(bytes));
}
public static byte[] convertObjectToBytes(Object object) throws IOException {
- /*
- * MarshalAware is requried so objects that get put into the cache can
- * take advantage of the faster way of serializaing objects, if they
- * don't want to then they can skip it or for simplicity's sake they
- * can start with Serizliazable then implement MarshalAware if
- * serizlization performance becomes a concern.
- */
- if (object instanceof MarshalAware) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
- WritableByteChannel channel = Channels.newChannel(baos);
- ((MarshalAware) object).writeExternal(channel);
- channel.close();
- return baos.toByteArray();
- } else if (object instanceof Serializable) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream stream = new ObjectOutputStream(baos);
- stream.writeObject(object);
- stream.flush();
- stream.close();
- return baos.toByteArray();
- } else {
- throw new RuntimeException("Invalid argument - Object"
- + " is not an instance of java.io.Serializable"
- + " or MarshalAware");
- }
+ /*
+ * MarshalAware is requried so objects that get put into the cache can
+ * take advantage of the faster way of serializaing objects, if they
+ * don't want to then they can skip it or for simplicity's sake they can
+ * start with Serizliazable then implement MarshalAware if serizlization
+ * performance becomes a concern.
+ */
+ if (object instanceof MarshalAware) {
+ /**
+ * ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
+ * WritableByteChannel channel = Channels.newChannel(baos);
+ * ((MarshalAware) object).writeExternal(channel);
+ * channel.close(); return baos.toByteArray();
+ */
+ return null;
+ } else if (object instanceof Serializable) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream stream = new ObjectOutputStream(baos);
+ stream.writeObject(object);
+ stream.flush();
+ stream.close();
+ return baos.toByteArray();
+ } else {
+ throw new RuntimeException("Invalid argument - Object"
+ + " is not an instance of java.io.Serializable"
+ + " or MarshalAware");
+ }
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BulkSendCommand.java Thu Oct 19 09:12:51 2006
@@ -19,10 +19,10 @@
package org.apache.geronimo.gcache.command;
import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.geronimo.gcache.transports.CommandVisitor;
+import org.apache.mina.common.ByteBuffer;
/**
* This command is letting the client or server know that it can expext
@@ -45,16 +45,15 @@
}
- public void readExternal(ReadableByteChannel channel) throws IOException {
- super.readExternal(channel);
+ public void readExternal(ByteBuffer buffer) throws IOException{
+ super.readExternal(buffer);
- numberOfCommands = readInt(channel);
+ numberOfCommands = buffer.getInt();
}
- public void writeExternal(WritableByteChannel channel) throws IOException {
- super.writeExternal(channel);
-
- writeInt(channel, numberOfCommands);
+ public void writeExternal(ByteBuffer buffer) throws IOException {
+ super.writeExternal(buffer);
+ buffer.putInt(numberOfCommands);
}
public void execute(CommandVisitor visitor) throws IOException {
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CacheBaseCommand.java Thu Oct 19 09:12:51 2006
@@ -19,6 +19,7 @@
package org.apache.geronimo.gcache.command;
import org.apache.geronimo.gcache.util.ByteArray;
+import org.apache.mina.common.ByteBuffer;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
@@ -79,21 +80,21 @@
return new ByteArray(key);
}
- public void readExternal(ReadableByteChannel channel) throws IOException {
- super.readExternal(channel);
+ public void readExternal(ByteBuffer buffer) throws IOException{
+ super.readExternal(buffer);
//Process what we want read
- cacheName = readString(channel);
- key = readBytes(channel);
- sessionId = readString(channel);
+ cacheName = readString(buffer);
+ key = readBytes(buffer);
+ sessionId = readString(buffer);
}
- public void writeExternal(WritableByteChannel channel) throws IOException {
- super.writeExternal(channel);
+ public void writeExternal(ByteBuffer buffer) throws IOException {
+ super.writeExternal(buffer);
//Process what we want to write
- writeString(channel, cacheName);
- writeBytes(channel, key);
- writeString(channel, sessionId);
+ writeString(buffer, cacheName);
+ writeBytes(buffer, key);
+ writeString(buffer, sessionId);
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommand.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommand.java Thu Oct 19 09:12:51 2006
@@ -22,40 +22,41 @@
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-public class CachePayloadBaseCommand extends CacheBaseCommand{
+import org.apache.mina.common.ByteBuffer;
+
+public class CachePayloadBaseCommand extends CacheBaseCommand {
// this is the actual data that must be added to the distributed cache
private byte[] payload = null;
- public void readExternal(ReadableByteChannel channel) throws IOException {
- super.readExternal(channel);
+ public void readExternal(ByteBuffer buffer) throws IOException {
+ super.readExternal(buffer);
- //Process what we want to read
- payload = this.readBytes(channel);
+ // Process what we want to read
+ payload = readBytes(buffer);
}
- public void writeExternal(WritableByteChannel channel) throws IOException {
- super.writeExternal(channel);
+ public void writeExternal(ByteBuffer buffer) throws IOException {
+ super.writeExternal(buffer);
- //Process what we want to write
- writeBytes(channel, payload);
+ // Process what we want to write
+ writeBytes(buffer, payload);
}
-
public void setPayload(Object value) throws IOException {
- setRawPayload(convertObjectToBytes(value));
+ setRawPayload(convertObjectToBytes(value));
}
- public Object getPayload() throws IOException{
- return convertObjectFromBytes(getRawPayload());
+ public Object getPayload() throws IOException {
+ return convertObjectFromBytes(getRawPayload());
}
public byte[] getRawPayload() {
- return payload;
+ return payload;
}
public void setRawPayload(byte[] data) {
- this.payload = data;
+ this.payload = data;
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/MessageAckCommand.java Thu Oct 19 09:12:51 2006
@@ -19,10 +19,9 @@
package org.apache.geronimo.gcache.command;
import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
import org.apache.geronimo.gcache.transports.CommandVisitor;
+import org.apache.mina.common.ByteBuffer;
public class MessageAckCommand extends BaseCommand{
@@ -40,19 +39,19 @@
this.messageId = messageId;
}
- public void readExternal(ReadableByteChannel channel) throws IOException {
+ public void readExternal(ByteBuffer buffer) throws IOException {
- super.readExternal(channel);
+ super.readExternal(buffer);
- messageId = readLong(channel);
+ messageId = buffer.getLong();
}
- public void writeExternal(WritableByteChannel channel) throws IOException {
+ public void writeExternal(ByteBuffer buffer) throws IOException {
- super.writeExternal(channel);
+ super.writeExternal(buffer);
+ buffer.putLong(messageId);
- writeLong(channel, messageId);
}
public void execute(CommandVisitor visitor) throws IOException{
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/PutSessionCommand.java Thu Oct 19 09:12:51 2006
@@ -19,7 +19,6 @@
package org.apache.geronimo.gcache.command;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
@@ -32,23 +31,24 @@
import org.apache.geronimo.gcache.util.ByteArray;
import org.apache.geronimo.gcache.util.ByteArrayInputStream;
import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
+import org.apache.mina.common.ByteBuffer;
public class PutSessionCommand extends CachePayloadBaseCommand {
private int entryCount;
public byte getCommandType() throws IOException {
- return CommandTypes.PUT_SESSION_COMMAND;
+ return CommandTypes.PUT_SESSION_COMMAND;
}
- public void readExternal(ReadableByteChannel channel) throws IOException {
- super.readExternal(channel);
- entryCount = readInt(channel);
+ public void readExternal(ByteBuffer buffer) throws IOException {
+ super.readExternal(buffer);
+ entryCount = buffer.getInt();
}
- public void writeExternal(WritableByteChannel channel) throws IOException {
- super.writeExternal(channel);
- writeInt(channel, entryCount);
+ public void writeExternal(ByteBuffer buffer) throws IOException {
+ super.writeExternal(buffer);
+ buffer.putInt(entryCount);
}
/**
@@ -59,103 +59,93 @@
* @throws IOException
*/
public void setPayloadFromSession(Map session) throws IOException {
- Set set = session.keySet();
- entryCount = set.size();
+ ByteBuffer buffer = null;
- //Set up the output stream
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- WritableByteChannel channel = Channels.newChannel(baos);
-
- for (Object aSet : set) {
-
- buffer.rewind();
-
- byte rawKey[] = convertObjectToBytes(aSet);
-
- //Keysize
- buffer.putInt(rawKey.length);
- buffer.flip();
- channel.write(buffer);
-
- baos.write(rawKey);
-
- //Data
- buffer.rewind();
- Object objData = session.get(aSet);
- byte data[] = convertObjectToBytes(objData);
-
- //Data Size
- buffer.putInt(data.length);
- buffer.flip();
- channel.write(buffer);
-
- baos.write(data);
-
- }
-
- //Set the payload
- setRawPayload(baos.toByteArray());
+ try {
+ Set set = session.keySet();
+ entryCount = set.size();
+
+ //Set up the output stream
+ buffer = ByteBuffer.allocate(1024, false);
+ buffer.setAutoExpand(true);
+
+ for (Object aSet : set) {
+
+ byte rawKey[] = convertObjectToBytes(aSet);
+
+ //Keysize
+ buffer.putInt(rawKey.length);
+ buffer.put(rawKey);
+
+ //Data
+ Object objData = session.get(aSet);
+ byte data[] = convertObjectToBytes(objData);
+
+ //Data Size
+ buffer.putInt(data.length);
+ buffer.put(data);
+
+ }
+
+ //Set the payload
+ buffer.limit(buffer.position());
+ buffer.flip();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ WritableByteChannel channel = Channels.newChannel(baos);
+ channel.write(buffer.buf());
+ setRawPayload(baos.toByteArray());
+
+ } finally {
+ if (buffer != null)
+ buffer.release();
+ }
}
-
/**
* Returns a normal map made up of objects that have meaning to the client from the payload.
* This is used primarily by the client since the server only cares about the raw bytes.
* @return
* @throws IOException
*/
- public Map getSessionFromPayload() throws IOException{
+ public Map getSessionFromPayload() throws IOException {
- Map<Object, Object> sessionMap = Collections.synchronizedMap(new HashMap<Object, Object>());
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- ByteArrayInputStream bais = new ByteArrayInputStream(getRawPayload());
- ReadableByteChannel channel = Channels.newChannel(bais);
- int keySize;
- int dataSize;
- int read;
-
- for (int i = 0; i < this.entryCount; i++) {
- buffer.rewind();
-
- //Get the key Size
- read = channel.read(buffer);
- if (read == 4) {
- buffer.flip();
- keySize = buffer.getInt();
- } else {
- throw new IOException("Could not read the key length from record(" + i + ")");
- }
-
- //Read the key
- byte [] key = new byte[keySize];
- if (bais.read(key) != keySize)
- throw new IOException("Could not read key from record(" + i + ")");
- Object objKey = convertObjectFromBytes(key);
-
- //Get the data Size
- buffer.rewind();
- read = channel.read(buffer);
- if (read == 4) {
- buffer.flip();
- dataSize = buffer.getInt();
- } else {
- throw new IOException("Could not read the data length from record(" + i + ")");
- }
-
-
- //Get the data
- byte [] data = new byte[dataSize];
- if (bais.read(data) != dataSize)
- throw new IOException("Could not read data from record(" + i + ")");
- Object objData = convertObjectFromBytes(data);
-
- //Place the objects in the map
- sessionMap.put(objKey, objData);
- }
+ ByteBuffer buffer = null;
+ try {
- return sessionMap;
+ Map<Object, Object> sessionMap = Collections
+ .synchronizedMap(new HashMap<Object, Object>());
+ buffer = ByteBuffer.wrap(getRawPayload());
+ int keySize;
+ int dataSize;
+
+ for (int i = 0; i < this.entryCount; i++) {
+
+ keySize = buffer.getInt();
+
+ //Read the key
+ byte[] key = new byte[keySize];
+ buffer.get(key);
+ Object objKey = convertObjectFromBytes(key);
+
+ //Get the data Size
+ dataSize = buffer.getInt();
+
+ //Get the data
+ byte[] data = new byte[dataSize];
+ buffer.get(data);
+ Object objData = convertObjectFromBytes(data);
+
+ //Place the objects in the map
+ sessionMap.put(objKey, objData);
+ }
+
+ return sessionMap;
+ } finally {
+ if (buffer != null)
+ buffer.release();
+ }
}
+
/**
* Sets the payload bytes from a raw session map, or one that is made up of byes.
* This is primarily used on the server and client's shouldn't access this method.
@@ -164,43 +154,45 @@
*/
public void setPayloadFromRawSession(Map session) throws IOException {
- Set set = session.keySet();
- entryCount = set.size();
-
- //Set up the output stream
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- WritableByteChannel channel = Channels.newChannel(baos);
-
- for (Object aSet : set) {
+ ByteBuffer buffer = null;
+ try {
- buffer.rewind();
+ Set set = session.keySet();
+ entryCount = set.size();
- ByteArray key = (ByteArray) aSet;
- byte rawKey[] = key.getBytes();
-
- //Keysize
- buffer.putInt(rawKey.length);
- buffer.flip();
- channel.write(buffer);
-
- baos.write(rawKey);
-
- //Data
- buffer.rewind();
- byte data[] = (byte []) session.get(key);
-
- //Data Size
- buffer.putInt(data.length);
- buffer.flip();
- channel.write(buffer);
-
- baos.write(data);
-
- }
-
- //Set the payload
- setRawPayload(baos.toByteArray());
+ //Set up the output stream
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ buffer = ByteBuffer.allocate(1024, true);
+ buffer.setAutoExpand(true);
+ WritableByteChannel channel = Channels.newChannel(baos);
+
+ for (Object aSet : set) {
+
+ ByteArray key = (ByteArray) aSet;
+ byte rawKey[] = key.getBytes();
+
+ //Keysize
+ buffer.putInt(rawKey.length);
+ buffer.put(rawKey);
+
+ //Data
+ byte data[] = (byte[]) session.get(key);
+
+ //Data Size
+ buffer.putInt(data.length);
+ buffer.put(data);
+
+ }
+ buffer.limit(buffer.position());
+ buffer.flip();
+ channel.write(buffer.buf());
+
+ //Set the payload
+ setRawPayload(baos.toByteArray());
+ } finally {
+ if (buffer != null)
+ buffer.release();
+ }
}
/**
@@ -209,56 +201,44 @@
* @return
* @throws IOException
*/
- public Map getRawSessionFromPayload() throws IOException{
+ public Map getRawSessionFromPayload() throws IOException {
+
+ ByteBuffer buffer = null;
+ try {
+
+ Map<ByteArray, byte[]> sessionMap = Collections
+ .synchronizedMap(new HashMap<ByteArray, byte[]>());
+ buffer = ByteBuffer.wrap(getRawPayload());
+
+ int keySize;
+ int dataSize;
+
+ for (int i = 0; i < this.entryCount; i++) {
+ //Get the key Size
+ keySize = buffer.getInt();
+
+ //Read the key
+ byte[] key = new byte[keySize];
+ buffer.get(key);
+
+ //Get the data Size
+ dataSize = buffer.getInt();
- Map<ByteArray, byte[]> sessionMap = Collections.synchronizedMap(new HashMap<ByteArray, byte[]>());
- ByteBuffer buffer = ByteBuffer.allocateDirect(4);
- ByteArrayInputStream bais = new ByteArrayInputStream(getRawPayload());
- ReadableByteChannel channel = Channels.newChannel(bais);
- int keySize;
- int dataSize;
- int read;
-
- for (int i = 0; i < this.entryCount; i++) {
- buffer.rewind();
-
- //Get the key Size
- read = channel.read(buffer);
- if (read == 4) {
- buffer.flip();
- keySize = buffer.getInt();
- } else {
- throw new IOException("Could not read the key length from record(" + i + ")");
- }
-
- //Read the key
- byte [] key = new byte[keySize];
- if (bais.read(key) != keySize)
- throw new IOException("Could not read key from record(" + i + ")");
-
- //Get the data Size
- buffer.rewind();
- read = channel.read(buffer);
- if (read == 4) {
- buffer.flip();
- dataSize = buffer.getInt();
- } else {
- throw new IOException("Could not read the data length from record(" + i + ")");
- }
-
-
- //Get the data
- byte [] data = new byte[dataSize];
- if (bais.read(data) != dataSize)
- throw new IOException("Could not read data from record(" + i + ")");
+ //Get the data
+ byte[] data = new byte[dataSize];
+ buffer.get(data);
- sessionMap.put(new ByteArray(key), data);
- }
+ sessionMap.put(new ByteArray(key), data);
+ }
- return sessionMap;
+ return sessionMap;
+ } finally {
+ if (buffer != null)
+ buffer.release();
+ }
}
public void execute(CommandVisitor visitor) throws IOException {
- visitor.processPutSession(this);
+ visitor.processPutSession(this);
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/marshal/MarshalAware.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/marshal/MarshalAware.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/marshal/MarshalAware.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/marshal/MarshalAware.java Thu Oct 19 09:12:51 2006
@@ -19,9 +19,10 @@
package org.apache.geronimo.gcache.marshal;
import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
+import org.apache.mina.common.ByteBuffer;
+
/**
* Implementing this interface provides a way to optimize serilazation for
* distributed caching scenarios. If simply implementing java.io.Serializable
@@ -44,7 +45,8 @@
* @param channel
* @throws IOException
*/
- void readExternal(ReadableByteChannel channel) throws IOException;
+ void readExternal(ByteBuffer buffer) throws IOException;
+
/**
* Write data into <code>channel</code> to save the state of the object.
* The data should be written in the same order it is expected in the
@@ -53,5 +55,5 @@
* @param channel
* @throws IOException
*/
- void writeExternal(WritableByteChannel channel) throws IOException;
+ void writeExternal(ByteBuffer buffer) throws IOException;
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheServer.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheServer.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/GCacheServer.java Thu Oct 19 09:12:51 2006
@@ -24,23 +24,24 @@
public class GCacheServer {
private final Configuration config;
+
private GCacheThread gcache = null;
public GCacheServer(Configuration config) {
- this.config = config;
+ this.config = config;
}
public void start() {
- if (gcache == null) {
- gcache = new GCacheThread(config);
- gcache.start();
- }
+ if (gcache == null) {
+ gcache = new GCacheThread(config);
+ gcache.start();
+ }
}
public void stop() {
- if (gcache != null)
- gcache.halt();
- gcache = null;
+ if (gcache != null)
+ gcache.halt();
+ gcache = null;
}
public static void main(String[] args) {
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/config/Configuration.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/config/Configuration.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/config/Configuration.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/config/Configuration.java Thu Oct 19 09:12:51 2006
@@ -17,16 +17,6 @@
*/
package org.apache.geronimo.gcache.server.config;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
-
public class Configuration {
- private ThreadPool threadPool = null;
-
- public ThreadPool getThreadPool() {
- return threadPool;
- }
- public void setThreadPool(ThreadPool threadPool) {
- this.threadPool = threadPool;
- }
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/core/GCacheThread.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/core/GCacheThread.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/core/GCacheThread.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/server/core/GCacheThread.java Thu Oct 19 09:12:51 2006
@@ -20,19 +20,16 @@
import net.sf.ehcache.CacheManager;
import org.apache.geronimo.gcache.server.config.Configuration;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
public class GCacheThread extends Thread {
private boolean stopped;
private boolean running;
- private final ThreadPool pool;
private final CacheManager cacheManager;
public GCacheThread(Configuration config) {
stopped = true;
running = false;
- this.pool = config.getThreadPool();
net.sf.ehcache.config.Configuration econfig = new net.sf.ehcache.config.Configuration();
cacheManager = new CacheManager(econfig);
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/Constants.java Thu Oct 19 09:12:51 2006
@@ -20,4 +20,5 @@
public class Constants {
public final static byte MAGIC[] = new byte[]{'G', 'C', 'a', 'c', 'h', 'e'};
+ public final static int HEADER_SIZE = MAGIC.length + 1 + 4;
}
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java?view=auto&rev=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java Thu Oct 19 09:12:51 2006
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory;
+
+public class TCPCommandProtocolCodecFactory extends DemuxingProtocolCodecFactory {
+
+ public TCPCommandProtocolCodecFactory() {
+ super.register(TCPCommandRequestDecoder.class);
+ super.register(TCPCommandRequestEncoder.class);
+ }
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandProtocolCodecFactory.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java?view=auto&rev=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java Thu Oct 19 09:12:51 2006
@@ -0,0 +1,151 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.mina.filter.codec.demux.MessageDecoderAdapter;
+import org.apache.mina.filter.codec.demux.MessageDecoderResult;
+
+public class TCPCommandRequestDecoder extends MessageDecoderAdapter {
+
+ private static Log log = LogFactory.getLog(TCPCommandRequestDecoder.class);
+
+
+ public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
+ // Return NEED_DATA if the whole header is not read yet.
+ try {
+ return messageComplete(in);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+
+ return MessageDecoderResult.NOT_OK;
+ }
+
+ public MessageDecoderResult decode(IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out) throws Exception {
+ // TODO Auto-generated method stub
+
+ byte magic[] = new byte[Constants.MAGIC.length];
+ in.get(magic);
+
+ // Get the command
+ byte commandIdentifier = in.get();
+
+ // Get the command length
+ in.getInt();
+
+ Command command = CommandTypes.createCommand(commandIdentifier);
+
+ command.readExternal(in);
+
+ out.write(command);
+
+ return MessageDecoderResult.OK;
+ }
+
+ private MessageDecoderResult messageComplete(ByteBuffer in)
+ throws Exception {
+
+ // HEADER BREAKDOWN
+ // ---------------------------------------------------------
+ // MAGIC HEADER - 6 bytes
+ // COMMAND - 1 byte
+ // COMMAND LENGTH - 4 bytes
+ // COMMAND - ? bytes as defined above
+
+ if (in.remaining() < Constants.HEADER_SIZE)
+ return MessageDecoderResult.NEED_DATA;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Header read:");
+ // log.debug(HexDump.hexString(buffer.array()));
+ }
+
+ // Read the magic
+ byte magic[] = new byte[Constants.MAGIC.length];
+ in.get(magic);
+
+ // Better match the Magic
+ if (!Arrays.equals(Constants.MAGIC, magic)) {
+ // Magic doesn't match, so close the socket
+ log.debug("Magic did not match!");
+ // infoHolder.getEndpointManager().removeEndpoint( new
+ // TCPEndpoint(channel));
+
+ // disconnect(channel);
+ // return;
+ }
+
+ log.debug("Magic found");
+
+ // Get the command
+ byte commandIdentifier = in.get();
+ if (log.isDebugEnabled()) {
+ log.debug("Command Identifier = " + commandIdentifier);
+ }
+
+ // Get the command length
+ int length = in.getInt();
+ if (log.isDebugEnabled()) {
+ log.debug("Command length = " + length);
+ }
+
+ // Be sure we have all of the data we need
+ if (in.remaining() != length) {
+ return MessageDecoderResult.NEED_DATA;
+ }
+
+
+ /**
+ * // TODO - This really should be done with a ByteBuffer pool
+ * ByteBuffer commandBuffer = ByteBuffer.allocate(length);
+ * bcr.reset(channel, commandBuffer); count = bcr.readBuffer(length); if
+ * (count < 0) { log.debug("Client disconnected...removing endpoint."); //
+ * Remove the endpoint from the list of clients
+ * infoHolder.getEndpointManager().removeEndpoint( new
+ * TCPEndpoint(channel)); channel.close(); return; } if (count < length) { //
+ * Command was bad if (log.isDebugEnabled()) { log.debug("Command read
+ * size (" + count + ") did not equal expected length (" + length +
+ * ")"); } // TODO - Send back a resend? return; }
+ *
+ * byte commandArray[] = commandBuffer.array();
+ *
+ * if (log.isDebugEnabled()) { log.debug("Command read:");
+ * log.debug(HexDump.hexString(commandArray)); }
+ *
+ * ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer
+ * .array()); ReadableByteChannel readChannel = Channels.newChannel(bias); // Create the command and unmarshal the
+ * data Command command = CommandTypes.createCommand(commandIdentifier);
+ * if (log.isDebugEnabled()) { log.debug("Command is a " +
+ * command.getClass().getSimpleName()); }
+ * command.readExternal(readChannel);
+ */
+
+ return MessageDecoderResult.OK;
+ }
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestDecoder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java?view=auto&rev=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java Thu Oct 19 09:12:51 2006
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.BulkSendCommand;
+import org.apache.geronimo.gcache.command.ClearCacheCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.GetCacheCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.geronimo.gcache.command.PutEntryCommand;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.apache.geronimo.gcache.command.RemoveEntryCommand;
+import org.apache.geronimo.gcache.command.RemoveSessionCommand;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+import org.apache.mina.filter.codec.demux.MessageEncoder;
+
+public class TCPCommandRequestEncoder implements MessageEncoder {
+ private static final Set TYPES;
+
+ static
+ {
+ Set<Class> types= new HashSet<Class> ();
+ types.add( BulkSendCommand.class );
+ types.add( ClearCacheCommand.class );
+ types.add( GetCacheCommand.class );
+ types.add( MessageAckCommand.class );
+ types.add( PutEntryCommand.class );
+ types.add( PutSessionCommand.class );
+ types.add( RemoveEntryCommand.class );
+ types.add( RemoveSessionCommand.class );
+ TYPES = Collections.unmodifiableSet( types );
+ }
+
+ public void encode(IoSession sess, Object obj, ProtocolEncoderOutput out) throws Exception {
+
+ Command command = (Command)obj;
+ out.write(ByteBuffer.wrap(((BaseCommand)command).createPacket(true)));
+ }
+
+ public Set getMessageTypes() {
+ // TODO Auto-generated method stub
+ return TYPES;
+ }
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandRequestEncoder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Thu Oct 19 09:12:51 2006
@@ -44,16 +44,17 @@
import org.apache.geronimo.gcache.command.RemoveSessionCommand;
import org.apache.geronimo.gcache.transports.CommandVisitor;
import org.apache.geronimo.gcache.util.BufferChannelWriter;
+import org.apache.mina.common.IoSession;
public class TCPCommandVisitor implements CommandVisitor {
Log log = LogFactory.getLog(TCPCommandVisitor.class);
private CacheInfoHolder infoHolder;
- private SelectionKey key;
+ private IoSession sess;
- public TCPCommandVisitor(CacheInfoHolder infoHolder, SelectionKey key) {
- this.key = key;
+ public TCPCommandVisitor(CacheInfoHolder infoHolder, IoSession sess) {
+ this.sess = sess;
this.infoHolder = infoHolder;
}
@@ -108,14 +109,13 @@
// Place the raw session in the cache
try {
- cache.put(new Element(command.getSessionId(), command
- .getRawSessionFromPayload()));
+ cache.put(new Element(command.getSessionId(), command.getRawSessionFromPayload()));
// Ack the message
MessageAckCommand ack = new MessageAckCommand();
ack.setMessageId(command.getCommandId());
- byte[] packet = ack.createPacket(true);
- sendPacket(packet);
+ if (sess != null) //This line is for unit testing
+ sess.write(ack);
} catch (IOException e) {
// TODO - What should we do on an IOException, ignore it or
@@ -167,16 +167,14 @@
Cache cache = infoHolder.getCache(command.getCacheName(), true);
//Add the client endpoint
- infoHolder.getEndpointManager().addEndpoint(new TCPEndpoint(key));
+ infoHolder.getEndpointManager().addEndpoint(new TCPEndpoint(sess));
//Send a bulk command
BulkSendCommand bulk = new BulkSendCommand();
bulk.setNumberOfCommands(cache.getSize());
try {
- if (sendPacket(bulk.createPacket(true)) < 0) {
- return;
- }
-
+ sess.write(bulk);
+
for (Object key : (List<Object>) cache.getKeys()) {
Element element = cache.get(key);
Object payload = element.getValue();
@@ -198,8 +196,7 @@
}
//Send the packet. If there is a failure just abort
- if (sendPacket(newCommand.createPacket(false)) < 0)
- return;
+ sess.write(newCommand);
}
@@ -221,19 +218,4 @@
public void processBulkSend(BulkSendCommand command) {
}
- private int sendPacket(byte[] packet) throws IOException {
- // This line if for tests...key should not be null
- if (key == null)
- return 0;
- SocketChannel channel = (SocketChannel) key.channel();
- BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer
- .wrap(packet));
- int written = bcw.writeBuffer(packet.length);
- if (written == -1) {
- // Remove the endpoint from the list of clients
- infoHolder.getEndpointManager().removeEndpoint(
- new TCPEndpoint(channel));
- }
- return written;
- }
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java Thu Oct 19 09:12:51 2006
@@ -17,37 +17,29 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-
import org.apache.geronimo.gcache.transports.Endpoint;
+import org.apache.mina.common.IoSession;
public class TCPEndpoint extends Object implements Endpoint{
- private SocketChannel channel;
+ private IoSession ioSession;
- public TCPEndpoint(SocketChannel channel) {
- this.channel = channel;
+ public TCPEndpoint(IoSession ioSession) {
+ this.ioSession = ioSession;
}
- //Convenience constructor to extract channel from the key
- public TCPEndpoint(SelectionKey key) {
- channel = (SocketChannel) key.channel();
- }
-
- public SocketChannel getChannel() {
- return channel;
+ public IoSession getIoSession() {
+ return ioSession;
}
@Override
public boolean equals(Object obj) {
- return channel.socket().getInetAddress().equals(obj);
+ return ioSession.equals(obj);
}
@Override
public int hashCode() {
- return channel.socket().getInetAddress().hashCode();
+ return ioSession.hashCode();
}
-
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketHandler.java Thu Oct 19 09:12:51 2006
@@ -16,111 +16,52 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
-import java.io.IOException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.geronimo.gcache.ThreadSupport;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.util.SessionLog;
/**
* This class is the main worker of the gcache functionality. Essentialy this
*/
-public class TCPSocketHandler extends ThreadSupport {
- public static int DEFAULT_TIMEOUT = 2000;
- Log log = LogFactory.getLog(TCPSocketHandler.class);
-
- private SelectionKeyProcessorFactory processorFactory;
-
- private Selector selector = null;
- private int timeOut = DEFAULT_TIMEOUT;
- private ThreadPool pool;
-
- public TCPSocketHandler(int timeOut, SelectionKeyProcessorFactory processorFactory, ThreadPool pool) throws IOException {
- this.timeOut = timeOut;
-
- this.processorFactory = processorFactory;
-
- this.pool = pool;
-
- selector = Selector.open();
+public class TCPSocketHandler extends IoHandlerAdapter {
+
+ private CacheInfoHolder infoHolder = null;
+
+ public TCPSocketHandler(CacheInfoHolder infoHolder) {
+ this.infoHolder = infoHolder;
}
- public void register(SelectableChannel channel, int ops) throws IOException {
- channel.configureBlocking(false);
- channel.register(selector, ops);
- }
+ @Override
+ public void messageReceived(IoSession session, Object obj) throws Exception {
+
+ Command command = (Command) obj;
+
+ // Do something with the data
+ TCPCommandVisitor visitor = new TCPCommandVisitor(infoHolder, session);
+
+ command.execute(visitor);
- protected void execute() {
- try {
- if (selector.keys().isEmpty()) {
- Thread.sleep(timeOut);
- return;
- }
-
- if (selector.select(timeOut) == 0)
- return;
-
- //Process the keys from the queue
- Iterator iter = selector.selectedKeys().iterator();
- while (iter.hasNext()) {
- SelectionKey key = (SelectionKey) iter.next();
-
- //Pull it off the queue for handling
- iter.remove();
-
- if (!key.isValid()) {
- key.cancel();
- }
-
- if (key.isAcceptable()) {
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel channel = server.accept();
- if (channel == null)
- continue;
- register(channel, SelectionKey.OP_READ);
- }
- if (key.isReadable()) {
- key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
- pool.execute(processorFactory.createSelectionKeyProcessor(key));
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
}
- public void halt() {
- super.halt();
-
- //Wake up the selector if its in a wait state
- selector.wakeup();
-
- //Wait until everything is shutdown
- while (isRunning()) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- }
- }
+ @Override
+ public void exceptionCaught(IoSession sess, Throwable cause) throws Exception {
+ SessionLog.error( sess, "", cause );
+ infoHolder.getEndpointManager().removeEndpoint(new TCPEndpoint(sess));
+ sess.close();
}
- protected void initialize() {
+ @Override
+ public void messageSent(IoSession arg0, Object arg1) throws Exception {
+ // TODO Auto-generated method stub
+ super.messageSent(arg0, arg1);
}
- protected void cleanUp() {
- try {
- if ((selector != null) && (selector.isOpen())) {
- selector.close();
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ @Override
+ public void sessionClosed(IoSession sess) throws Exception {
+ //Remove the client from the list
+ infoHolder.getEndpointManager().removeEndpoint(new TCPEndpoint(sess));
}
+
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPSocketTransportServer.java Thu Oct 19 09:12:51 2006
@@ -18,52 +18,46 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.CacheInfoHolder;
import org.apache.geronimo.gcache.transports.TransportServer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.ThreadModel;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.transport.socket.nio.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;
public class TCPSocketTransportServer implements TransportServer {
-
- private ServerSocketChannel server = null;
+
+ IoAcceptor acceptor = null;
private InetSocketAddress inet = null;
- private ThreadPool pool = null;
- private TCPSocketHandler handler = null;
- private int timeOut = TCPSocketHandler.DEFAULT_TIMEOUT;
- private SelectionKeyProcessorFactory processorFactory;
+ private CacheInfoHolder info;
- public TCPSocketTransportServer(String address, int port, ThreadPool threadPool, int timeOut, SelectionKeyProcessorFactory processorFactory) {
+ public TCPSocketTransportServer(String address, int port, CacheInfoHolder info) {
inet = new InetSocketAddress(address, port);
- pool = threadPool;
- this.timeOut = timeOut;
- this.processorFactory = processorFactory;
+ this.info = info;
}
public void start() throws IOException {
+
+ acceptor = new SocketAcceptor();
+ SocketAcceptorConfig cfg = new SocketAcceptorConfig();
+ cfg.setReuseAddress( true );
+ cfg.getFilterChain().addLast(
+ "protocolFilter",
+ new ProtocolCodecFilter( new TCPCommandProtocolCodecFactory() ) );
+ cfg.setThreadModel(ThreadModel.MANUAL);
- server = ServerSocketChannel.open();
- server.configureBlocking(false);
- //TODO: refactor so this impl is not created here but instead its deferred
- handler = new TCPSocketHandler(timeOut, processorFactory, pool);
-
- // bind the server to the address
- ServerSocket socket = server.socket();
- socket.setReuseAddress(true);
- socket.bind(inet);
-
- // register the handler and start looking for data
- handler.register(server, SelectionKey.OP_ACCEPT);
- handler.start();
+ acceptor.bind( inet, new TCPSocketHandler(info), cfg );
}
public void stop() throws IOException {
- handler.halt();
- if (server != null && server.isOpen())
- server.close();
+
+ if (acceptor != null){
+ acceptor.unbindAll();
+ }
}
}
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/BulkSendCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/BulkSendCommandTest.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/BulkSendCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/BulkSendCommandTest.java Thu Oct 19 09:12:51 2006
@@ -18,6 +18,7 @@
*/
package org.apache.geronimo.gcache.command;
+import org.apache.mina.common.ByteBuffer;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
@@ -36,10 +37,8 @@
//Convert the command to bytes
byte[] marshalled = command.marshal();
- ByteArrayInputStream bias = new ByteArrayInputStream(marshalled);
- ReadableByteChannel channel = Channels.newChannel(bias);
BulkSendCommand readCommand = new BulkSendCommand();
- readCommand.readExternal(channel);
+ readCommand.readExternal(ByteBuffer.wrap(marshalled));
assert readCommand.getCommandId() == commandId;
assert readCommand.getNumberOfCommands() == 8;
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CacheBaseCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CacheBaseCommandTest.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CacheBaseCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CacheBaseCommandTest.java Thu Oct 19 09:12:51 2006
@@ -18,6 +18,7 @@
*/
package org.apache.geronimo.gcache.command;
+import org.apache.mina.common.ByteBuffer;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
@@ -41,10 +42,9 @@
//Convert the command to bytes
byte[] marshalled = command.marshal();
- ByteArrayInputStream bias = new ByteArrayInputStream(marshalled);
- ReadableByteChannel channel = Channels.newChannel(bias);
CacheBaseCommand readCommand = new CacheBaseCommand();
- readCommand.readExternal(channel);
+ readCommand.readExternal(ByteBuffer.wrap(marshalled));
+
assert readCommand.getCommandId() == commandId;
assert readCommand.getKey().equals(key);
assert readCommand.getSessionId().equals(sessionId);
@@ -63,10 +63,8 @@
//Convert the command to bytes
byte[] marshalled = command.marshal();
- ByteArrayInputStream bias = new ByteArrayInputStream(marshalled);
- ReadableByteChannel channel = Channels.newChannel(bias);
CacheBaseCommand readCommand = new CacheBaseCommand();
- readCommand.readExternal(channel);
+ readCommand.readExternal(ByteBuffer.wrap(marshalled));
assert readCommand.getCommandId() == commandId;
assert readCommand.getKey().equals(key);
assert readCommand.getSessionId() == null;
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommandTest.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/CachePayloadBaseCommandTest.java Thu Oct 19 09:12:51 2006
@@ -18,10 +18,7 @@
*/
package org.apache.geronimo.gcache.command;
-import java.io.ByteArrayInputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-
+import org.apache.mina.common.ByteBuffer;
import org.testng.annotations.Test;
public class CachePayloadBaseCommandTest {
@@ -43,10 +40,8 @@
//Convert the command to bytes
byte[] marshalled = command.marshal();
- ByteArrayInputStream bias = new ByteArrayInputStream(marshalled);
- ReadableByteChannel channel = Channels.newChannel(bias);
CachePayloadBaseCommand readCommand = new CachePayloadBaseCommand();
- readCommand.readExternal(channel);
+ readCommand.readExternal(ByteBuffer.wrap(marshalled));
byte readData[] = readCommand.getRawPayload();
String readString = (String)BaseCommand.convertObjectFromBytes(readData);
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/MessageAckCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/MessageAckCommandTest.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/MessageAckCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/MessageAckCommandTest.java Thu Oct 19 09:12:51 2006
@@ -18,12 +18,9 @@
*/
package org.apache.geronimo.gcache.command;
+import org.apache.mina.common.ByteBuffer;
import org.testng.annotations.Test;
-import java.io.ByteArrayInputStream;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.Channels;
-
public class MessageAckCommandTest {
@Test
@@ -36,10 +33,8 @@
//Convert the command to bytes
byte[] marshalled = command.marshal();
- ByteArrayInputStream bias = new ByteArrayInputStream(marshalled);
- ReadableByteChannel channel = Channels.newChannel(bias);
MessageAckCommand readCommand = new MessageAckCommand();
- readCommand.readExternal(channel);
+ readCommand.readExternal(ByteBuffer.wrap(marshalled));
assert readCommand.getCommandId() == commandId;
assert readCommand.getMessageId() == 99;
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/PutSessionCommandTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/PutSessionCommandTest.java?view=diff&rev=465670&r1=465669&r2=465670
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/PutSessionCommandTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/command/PutSessionCommandTest.java Thu Oct 19 09:12:51 2006
@@ -70,7 +70,7 @@
//The session should have 3 entries
assert rawSession.size() == 3;
- //The map is already in raw formet, so use it
+ //The map is already in raw format, so use it
PutSessionCommand command = (PutSessionCommand) CommandTypes.createCommand(CommandTypes.PUT_SESSION_COMMAND);
command.setCacheName(cacheName);
command.setSessionId(sessionId);