You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/03 04:24:00 UTC
svn commit: r452292 - in /geronimo/sandbox/gcache/server/src:
main/java/org/apache/geronimo/gcache/command/
main/java/org/apache/geronimo/gcache/transports/tcp/
main/java/org/apache/geronimo/gcache/util/
test/java/org/apache/geronimo/gcache/transports/...
Author: jgenender
Date: Mon Oct 2 19:23:59 2006
New Revision: 452292
URL: http://svn.apache.org/viewvc?view=rev&rev=452292
Log:
Get server to reply with ack for put session
Added:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java (with props)
Modified:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/command/BaseCommand.java Mon Oct 2 19:23:59 2006
@@ -32,6 +32,7 @@
import org.apache.geronimo.gcache.marshal.MarshalAware;
import org.apache.geronimo.gcache.transports.CommandVisitor;
+import org.apache.geronimo.gcache.transports.tcp.Constants;
import org.apache.geronimo.gcache.util.ByteArrayInputStream;
import org.apache.geronimo.gcache.util.ByteArrayOutputStream;
import org.apache.geronimo.gcache.util.UniqueId;
@@ -55,7 +56,7 @@
/**
* The command id is used to uniquely identify this command and is
* specifically used in the case where a response is required.
- * If the command id is still set to the default 0 a unique id is generated by the
+ * If the command id is still set to the default 0 a unique id is generated by the
* UniqueId class.
* @see org.apache.geronimo.gcache.util.UniqueId
*/
@@ -70,7 +71,7 @@
* This method is provided for the cases where a specfic id is required to
* coordinate with another (external) framework. Typical use will not
* require this method to be called, the default mechanism will suffice.
- *
+ *
* @param commandId
*/
public void setCommandId(long commandId) {
@@ -88,7 +89,7 @@
}
/**
- * @see org.apache.geronimo.gcache.marshal.MarshalAware.readExternal(ReadableByteChannel)
+ * @see org.apache.geronimo.gcache.marshal.MarshalAware.readExternal(ReadableByteChannel)
*/
public void readExternal(ReadableByteChannel channel) throws IOException {
// this is the root so no super impl, others should call super first
@@ -97,7 +98,7 @@
}
/**
- * @see org.apache.geronimo.gcache.marshal.MarshalAware.writeExternal(WritableByteChannel)
+ * @see org.apache.geronimo.gcache.marshal.MarshalAware.writeExternal(WritableByteChannel)
*/
public void writeExternal(WritableByteChannel channel) throws IOException {
// this is the root so no super impl, others should call super first
@@ -114,21 +115,23 @@
* type, the second 8 bytes are a checksum, the next 4 bytes are the command
* length. The remainder of the bytes will be the result of the
* writeExternal implementation of the command.
- *
+ *
* @return
* @throws IOException
*/
- // TODO: Is this the place for this method? Seems higher level to me, this
- // class should not know how
- // to make a packet IMO.
- public byte[] createPacket() throws IOException{
+ public byte[] createPacket(boolean includeMagic) throws IOException{
// COMMAND TYPE - 1 byte
// CHECKSUM - 8 bytes
// COMMAND LENGTH - 4 bytes
// COMMAND - ? bytes as defined above/
+ int startFrom = includeMagic ? Constants.MAGIC.length : 0;
+
ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
WritableByteChannel channel = Channels.newChannel(baos);
+ //Write magic if requested
+ baos.write(Constants.MAGIC);
+
//Write the command type
baos.write(getCommandType());
@@ -143,7 +146,7 @@
//TODO error handeling
channel.close();
- int commandStart = 13;
+ int commandStart = startFrom + 13;
//Get the entire command
byte command[] = baos.toByteArray();
@@ -154,19 +157,13 @@
checksum.update(command, commandStart, len);
//Write the checksum
- ByteBuffer.wrap(command, 1, 8).putLong(checksum.getValue());
+ ByteBuffer.wrap(command, startFrom + 1, 8).putLong(checksum.getValue());
//Write the length of the command
- ByteBuffer.wrap(command, 9, 4).putInt(len);
+ ByteBuffer.wrap(command, startFrom + 9, 4).putInt(len);
return command;
}
- // TODO - how is marshal used differently than createPacket? Seems
- // redundant.
- // It apears from the usage that this is a test utility method that makes it
- // easier to test writeExternal/readExternal.
- // No problem with that but I'd prefer it be part of the test code instead
- // of here.
public byte[] marshal() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(34);
WritableByteChannel channel = Channels.newChannel(baos);
@@ -290,7 +287,7 @@
} else {
buffer.putInt(-1);
}
-
+
buffer.flip();
channel.write(buffer);
@@ -354,7 +351,7 @@
* MarshalAware is requried so objects that get put into the cache can
* take advantage of the faster way of serializaing objects, if they
* don't want to then they can skip it or for simplicity's sake they
- * can start with Serizliazable then implement MarshalAware if
+ * can start with Serizliazable then implement MarshalAware if
* serizlization performance becomes a concern.
*/
if (object instanceof MarshalAware) {
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java Mon Oct 2 19:23:59 2006
@@ -91,13 +91,18 @@
//TODO - This really should be done with a ByteBuffer pool
ByteBuffer commandBuffer = ByteBuffer.allocate(length);
- count = channel.read(commandBuffer);
+ bcr.reset(channel, commandBuffer);
+ count = bcr.readBuffer(length);
+ if (count < 0) {
+ channel.close();
+ //TODO - remove the peer as the socket was closed
+ return;
+ }
if (count < length) {
//Command was bad
//TODO - Send back a resend?
+ return;
- //Exit out of here
-// break;
}
byte commandArray[] = commandBuffer.array();
@@ -116,11 +121,6 @@
TCPCommandVisitor visitor = new TCPCommandVisitor(infoHolder, key);
command.execute(visitor);
- // Test for closed connection on the client side
- if (count < 0) {
- channel.close();
- //TODO - remove the peer
- }
}
private void disconnect(SocketChannel channel) throws IOException {
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Mon Oct 2 19:23:59 2006
@@ -18,26 +18,29 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
-import org.apache.geronimo.gcache.transports.CommandVisitor;
-import org.apache.geronimo.gcache.CacheInfoHolder;
-import org.apache.geronimo.gcache.command.RemoveSessionCommand;
-import org.apache.geronimo.gcache.command.RemoveEntryCommand;
-import org.apache.geronimo.gcache.command.PutSessionCommand;
-import org.apache.geronimo.gcache.command.PutEntryCommand;
-import org.apache.geronimo.gcache.command.MessageAckCommand;
-import org.apache.geronimo.gcache.command.GetCacheCommand;
-import org.apache.geronimo.gcache.command.ClearCacheCommand;
-import org.apache.geronimo.gcache.command.BulkSendCommand;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import net.sf.ehcache.Cache;
import net.sf.ehcache.Element;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.BulkSendCommand;
+import org.apache.geronimo.gcache.command.ClearCacheCommand;
+import org.apache.geronimo.gcache.command.GetCacheCommand;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.geronimo.gcache.command.PutEntryCommand;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.apache.geronimo.gcache.command.RemoveEntryCommand;
+import org.apache.geronimo.gcache.command.RemoveSessionCommand;
+import org.apache.geronimo.gcache.transports.CommandVisitor;
+import org.apache.geronimo.gcache.util.BufferChannelWriter;
-import java.util.Map;
-import java.util.Collections;
-import java.util.HashMap;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
public class TCPCommandVisitor implements CommandVisitor {
Log log = LogFactory.getLog(TCPCommandVisitor.class);
@@ -102,6 +105,16 @@
//Place the raw session in the cache
try {
cache.put(new Element(command.getSessionId(), command.getRawSessionFromPayload()));
+
+ //Ack the message
+ MessageAckCommand ack = new MessageAckCommand();
+ ack.setMessageId(command.getCommandId());
+ byte [] packet = ack.createPacket(true);
+ int written = sendPacket(packet);
+ if (written == -1) {
+ //TODO - This means the socket is dead and need peer removal
+ }
+
} catch (IOException e) {
log.info(e);
}
@@ -155,5 +168,15 @@
}
public void processBulkSend(BulkSendCommand command) {
+ }
+
+ private int sendPacket(byte []packet) throws IOException {
+ //This line if for tests...key should not be null
+ if (key == null)
+ return 0;
+ SocketChannel channel = (SocketChannel) key.channel();
+ BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer.wrap(packet));
+ int written = bcw.writeBuffer(packet.length);
+ return written;
}
}
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelReader.java Mon Oct 2 19:23:59 2006
@@ -35,6 +35,12 @@
buffer.flip();
}
+ public void reset(SocketChannel channel, ByteBuffer buffer) {
+ this.channel = channel;
+ this.buffer = buffer;
+ buffer.flip();
+ }
+
public int readBuffer(int expectedCount) throws IOException {
int read = timedRead(expectedCount);
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java?view=auto&rev=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java Mon Oct 2 19:23:59 2006
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.geronimo.gcache.util;
+
+import java.nio.channels.SocketChannel;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+public class BufferChannelWriter {
+
+ private SocketChannel channel = null;
+ private ByteBuffer buffer = null;
+ private long timeout = 10000;
+
+ public BufferChannelWriter(SocketChannel channel, ByteBuffer buffer) {
+ this.channel = channel;
+ this.buffer = buffer;
+ }
+
+ public int writeBuffer(int expectedCount) throws IOException {
+ int read = timedWrite(expectedCount);
+
+ return read;
+ }
+
+ private int timedWrite(int size) throws IOException {
+ int written = internalWrite();
+
+ //If we got the full tomato, then leave
+ if (written == size)
+ return written;
+
+ //Test if anything came inbound or we have more bytes that need reading...
+ if ((written == 0) || (written < size)) {
+
+ //Nope...now need to wait
+ Selector selector = null;
+ SelectionKey key = null;
+ try {
+ Selector.open();
+
+ //Listen in for data on the line
+// channel.register(selector, SelectionKey.OP_WRITE);
+
+ int retVal = selector.select(timeout);
+ if (retVal == 0) {
+ //Hmmm...wakeup or timeout and nothing found?
+ return 0;
+ }
+
+ //retVal must equal 1, because there is only 1 key, so read it
+ int moreWritten = internalWrite();
+ //Error?
+ if (moreWritten < 0)
+ return moreWritten;
+
+ written += moreWritten;
+
+ } finally {
+ if (key != null) {
+ key.cancel();
+ }
+ if (selector != null) {
+ selector.close();
+ }
+ }
+ }
+
+ return written;
+ }
+
+ private int internalWrite() throws IOException {
+
+ int written = 0;
+ int totalBytes = 0;
+
+ do {
+ written = channel.write(buffer);
+ totalBytes += written;
+ } while (written > 0);
+
+ //Test for a closed socket
+ if (written < 0) {
+ //Return with EOF
+ return -1;
+ }
+
+ return totalBytes;
+ }
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/util/BufferChannelWriter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java?view=diff&rev=452292&r1=452291&r2=452292
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Mon Oct 2 19:23:59 2006
@@ -18,7 +18,12 @@
import net.sf.ehcache.CacheManager;
import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.util.BufferChannelReader;
import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
import org.apache.geronimo.gcache.server.spi.ThreadPool;
import org.apache.geronimo.gcache.transports.TransportServer;
@@ -29,8 +34,14 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.Channels;
import java.util.HashMap;
import java.util.Map;
+import java.util.Arrays;
+import java.util.zip.Checksum;
+import java.util.zip.CRC32;
+import java.io.ByteArrayInputStream;
public class TcpSocketServerTest {
@@ -58,10 +69,58 @@
command.setPayloadFromSession(session);
//Send the packet
- ByteBuffer magic = ByteBuffer.wrap(Constants.MAGIC);
- ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket());
- channel.write(magic);
+ ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket(true));
channel.write(commandBuffer);
+
+ //Now receive any data (it Should be a MessageAck)
+ ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13);
+ BufferChannelReader bcr = new BufferChannelReader(channel, receiveHeader);
+ int read = bcr.readBuffer(receiveHeader.capacity());
+ assert read == Constants.MAGIC.length + 13;
+ receiveHeader.flip();
+
+ //Read the magic
+ byte magic[] = new byte[Constants.MAGIC.length];
+ receiveHeader.get(magic);
+
+ //Better match the Magic
+ assert Arrays.equals(Constants.MAGIC, magic);
+
+ //Get the command
+ byte commandIdentifier = receiveHeader.get();
+
+ //Get the checksum
+ long checksum = receiveHeader.getLong();
+
+ //Get the command length
+ int length = receiveHeader.getInt();
+
+ commandBuffer = ByteBuffer.allocate(length);
+ bcr.reset(channel, commandBuffer);
+ int count = bcr.readBuffer(length);
+ assert count == length;
+
+ byte commandArray[] = commandBuffer.array();
+ Checksum calcChecksum = new CRC32();
+ calcChecksum.update(commandArray, 0, commandArray.length);
+ long newCheck = calcChecksum.getValue();
+
+ //Checksums match
+ assert newCheck == checksum;
+
+ //Now create the command
+ ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
+ ReadableByteChannel readChannel = Channels.newChannel(bias);
+
+ //Create the command and unmarshal the data
+ Command ackCommand = CommandTypes.createCommand(commandIdentifier);
+ ackCommand.readExternal(readChannel);
+
+ //Is the message the type we think it is?
+ assert ackCommand instanceof MessageAckCommand;
+
+ //Is the ack for the original message?
+ assert command.getCommandId() == ((MessageAckCommand)ackCommand).getMessageId();
}