You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by jg...@apache.org on 2006/10/10 01:51:04 UTC
svn commit: r454556 - in /geronimo/sandbox/gcache/server/src:
main/java/org/apache/geronimo/gcache/
main/java/org/apache/geronimo/gcache/transports/
main/java/org/apache/geronimo/gcache/transports/tcp/
test/java/org/apache/geronimo/gcache/transports/tcp/
Author: jgenender
Date: Mon Oct 9 16:51:03 2006
New Revision: 454556
URL: http://svn.apache.org/viewvc?view=rev&rev=454556
Log:
Add the Endpoint processing
Added:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java (with props)
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java (with props)
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java (with props)
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java (with props)
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (with props)
Modified:
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/CacheInfoHolder.java Mon Oct 9 16:51:03 2006
@@ -18,17 +18,25 @@
*/
package org.apache.geronimo.gcache;
-import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+
import org.apache.geronimo.gcache.server.listeners.CacheNotifier;
import org.apache.geronimo.gcache.server.listeners.DefaultCacheNotifier;
+import org.apache.geronimo.gcache.transports.EndpointManager;
public class CacheInfoHolder {
private final CacheManager cacheManager;
private CacheNotifier cacheNotifier = null;
+ private EndpointManager endpointManager;
public CacheInfoHolder(CacheManager cacheManager) {
this.cacheManager = cacheManager;
+ this.endpointManager = new EndpointManager();
+ }
+
+ public EndpointManager getEndpointManager() {
+ return endpointManager;
}
public CacheManager getCacheManager() {
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java Mon Oct 9 16:51:03 2006
@@ -0,0 +1,22 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports;
+
+public interface Endpoint {
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/Endpoint.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java Mon Oct 9 16:51:03 2006
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class EndpointManager {
+
+ Set<Endpoint> endpoints = Collections.synchronizedSet(new HashSet<Endpoint>());
+
+ public void addEndpoint(Endpoint endpoint){
+ endpoints.add(endpoint);
+ }
+
+ public void removeEndpoint(Endpoint endpoint){
+ endpoints.remove(endpoint);
+ }
+
+ public boolean contains(Endpoint endpoint){
+ return endpoints.contains(endpoint);
+ }
+
+ public int size(){
+ return endpoints.size();
+ }
+
+ public Set<Endpoint> getEndpoints() {
+ return endpoints;
+ }
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/EndpointManager.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/DefaultSelectionKeyProcessor.java Mon Oct 9 16:51:03 2006
@@ -94,8 +94,9 @@
bcr.reset(channel, commandBuffer);
count = bcr.readBuffer(length);
if (count < 0) {
+ //Remove the endpoint from the list of clients
+ infoHolder.getEndpointManager().removeEndpoint(new TCPEndpoint(channel));
channel.close();
- //TODO - remove the peer as the socket was closed
return;
}
if (count < length) {
Modified: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java (original)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPCommandVisitor.java Mon Oct 9 16:51:03 2006
@@ -18,11 +18,22 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import net.sf.ehcache.Cache;
import net.sf.ehcache.Element;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.BaseCommand;
import org.apache.geronimo.gcache.command.BulkSendCommand;
import org.apache.geronimo.gcache.command.ClearCacheCommand;
import org.apache.geronimo.gcache.command.GetCacheCommand;
@@ -34,149 +45,195 @@
import org.apache.geronimo.gcache.transports.CommandVisitor;
import org.apache.geronimo.gcache.util.BufferChannelWriter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
public class TCPCommandVisitor implements CommandVisitor {
Log log = LogFactory.getLog(TCPCommandVisitor.class);
private CacheInfoHolder infoHolder;
+
private SelectionKey key;
public TCPCommandVisitor(CacheInfoHolder infoHolder, SelectionKey key) {
- this.key = key;
- this.infoHolder = infoHolder;
+ this.key = key;
+ this.infoHolder = infoHolder;
}
public void processRemoveSession(RemoveSessionCommand command) {
- Cache cache = infoHolder.getCache(command.getCacheName(), true);
+ Cache cache = infoHolder.getCache(command.getCacheName(), true);
- //Be sure a session was sent
- if (command.hasSession()) {
- cache.remove(command.getSessionId());
- }
+ // Be sure a session was sent
+ if (command.hasSession()) {
+ cache.remove(command.getSessionId());
+ }
- //Notify peers
- infoHolder.getCacheNotifier().notifyRemoveSession(command);
+ // Notify peers
+ infoHolder.getCacheNotifier().notifyRemoveSession(command);
}
+ @SuppressWarnings("unchecked")
public void processRemoveEntry(RemoveEntryCommand command) {
+ Cache cache = infoHolder.getCache(command.getCacheName(), true);
- Cache cache = infoHolder.getCache(command.getCacheName(), true);
-
- //Check if we are using sessions
- if (command.hasSession()) {
+ // Check if we are using sessions
+ if (command.hasSession()) {
- Map sessionMap = null;
+ Map sessionMap = null;
- //We are so use the session maps that is stored
- Element element = cache.get(command.getSessionId());
- if (element != null) {
- sessionMap = (Map) element.getObjectValue();
- } else {
- sessionMap = Collections.synchronizedMap(new HashMap());
- }
+ // We are so use the session maps that is stored
+ Element element = cache.get(command.getSessionId());
+ if (element != null) {
+ sessionMap = (Map) element.getObjectValue();
+ } else {
+ sessionMap = Collections.synchronizedMap(new HashMap());
+ }
- sessionMap.remove(command.getHashableKey());
+ sessionMap.remove(command.getHashableKey());
- //Put the session away
- cache.put(new Element(command.getSessionId(), sessionMap));
+ // Put the session away
+ cache.put(new Element(command.getSessionId(), sessionMap));
- } else {
+ } else {
- //No session map so store the value
- cache.remove(command.getHashableKey());
- }
+ // No session map so store the value
+ cache.remove(command.getHashableKey());
+ }
- //Notify peers
- infoHolder.getCacheNotifier().notifyRemove(command);
+ // Notify peers
+ infoHolder.getCacheNotifier().notifyRemove(command);
}
public void processPutSession(PutSessionCommand command) {
- Cache cache = infoHolder.getCache(command.getCacheName(), true);
+ Cache cache = infoHolder.getCache(command.getCacheName(), true);
- //Place the raw session in the cache
- try {
- cache.put(new Element(command.getSessionId(), command.getRawSessionFromPayload()));
-
- //Ack the message
- MessageAckCommand ack = new MessageAckCommand();
- ack.setMessageId(command.getCommandId());
- byte [] packet = ack.createPacket(true);
- int written = sendPacket(packet);
- if (written == -1) {
- //TODO - This means the socket is dead and need peer removal
- }
-
- } catch (IOException e) {
- log.info(e);
- }
+ // Place the raw session in the cache
+ try {
+ cache.put(new Element(command.getSessionId(), command
+ .getRawSessionFromPayload()));
+
+ // Ack the message
+ MessageAckCommand ack = new MessageAckCommand();
+ ack.setMessageId(command.getCommandId());
+ byte[] packet = ack.createPacket(true);
+ sendPacket(packet);
+
+ } catch (IOException e) {
+ // TODO - What should we do on an IOException, ignore it or
+ // remove the client?
+ log.error(e);
+ }
- //Notify peers
- infoHolder.getCacheNotifier().notifyPutSession(command);
+ // Notify peers
+ infoHolder.getCacheNotifier().notifyPutSession(command);
}
+ @SuppressWarnings({ "unchecked" })
public void processPutEntry(PutEntryCommand command) {
- Cache cache = infoHolder.getCache(command.getCacheName(), true);
-
- //Check if we are using sessions
- if (command.hasSession()) {
-
- Map sessionMap = null;
-
- //We are so use the session maps that is stored
- Element element = cache.get(command.getSessionId());
- if (element != null) {
- sessionMap = (Map) element.getObjectValue();
- } else {
- sessionMap = Collections.synchronizedMap(new HashMap());
- }
+ Cache cache = infoHolder.getCache(command.getCacheName(), true);
- sessionMap.put(command.getHashableKey(), command.getRawPayload());
- //Put the session away
- cache.put(new Element(command.getSessionId(), sessionMap));
+ // Check if we are using sessions
+ if (command.hasSession()) {
- } else {
+ Map sessionMap = null;
- //No session map so store the value
- cache.put(new Element(command.getHashableKey(), command.getRawPayload()));
- }
+ // We are so use the session maps that is stored
+ Element element = cache.get(command.getSessionId());
+ if (element != null) {
+ sessionMap = (Map) element.getObjectValue();
+ } else {
+ sessionMap = Collections.synchronizedMap(new HashMap());
+ }
+
+ sessionMap.put(command.getHashableKey(), command.getRawPayload());
+ // Put the session away
+ cache.put(new Element(command.getSessionId(), sessionMap));
+
+ } else {
+
+ // No session map so store the value
+ cache.put(new Element(command.getHashableKey(), command
+ .getRawPayload()));
+ }
- //Notify peers
- infoHolder.getCacheNotifier().notifyPut(command);
+ // Notify peers
+ infoHolder.getCacheNotifier().notifyPut(command);
}
public void processMessageAck(MessageAckCommand command) {
}
+ @SuppressWarnings("unchecked")
public void processGetCache(GetCacheCommand command) {
+ Cache cache = infoHolder.getCache(command.getCacheName(), true);
+
+ //Add the client endpoint
+ infoHolder.getEndpointManager().addEndpoint(new TCPEndpoint(key));
+
+ //Send a bulk command
+ BulkSendCommand bulk = new BulkSendCommand();
+ bulk.setNumberOfCommands(cache.getSize());
+ try {
+ if (sendPacket(bulk.createPacket(true)) < 0) {
+ return;
+ }
+
+ for (Object key : (List<Object>) cache.getKeys()) {
+ Element element = cache.get(key);
+ Object payload = element.getValue();
+
+ BaseCommand newCommand = null;
+ // Test if we are sending a session or not
+ if (payload instanceof HashMap) {
+ PutSessionCommand psc = new PutSessionCommand();
+ psc.setCacheName(command.getCacheName());
+ psc.setSessionId((String) key);
+ psc.setPayloadFromSession((Map) payload);
+ newCommand = psc;
+ } else {
+ PutEntryCommand pec = new PutEntryCommand();
+ pec.setCacheName(command.getCacheName());
+ pec.setRawPayload((byte[]) payload);
+ pec.setRawKey((byte [])key);
+ newCommand = pec;
+ }
+
+ //Send the packet. If there is a failure just abort
+ if (sendPacket(newCommand.createPacket(false)) < 0)
+ return;
+ }
+
+
+ } catch (IOException e) {
+ // TODO - What should we do on an IOException, ignore it or
+ // remove the client?
+ log.error(e);
+ }
}
public void processClearCache(ClearCacheCommand command) {
- Cache cache = infoHolder.getCache(command.getCacheName(), true);
- cache.removeAll();
+ Cache cache = infoHolder.getCache(command.getCacheName(), true);
+ cache.removeAll();
- //Notify peers
- infoHolder.getCacheNotifier().notifyClearCache(command);
+ // Notify peers
+ infoHolder.getCacheNotifier().notifyClearCache(command);
}
public void processBulkSend(BulkSendCommand command) {
}
- private int sendPacket(byte []packet) throws IOException {
- //This line if for tests...key should not be null
- if (key == null)
- return 0;
- SocketChannel channel = (SocketChannel) key.channel();
- BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer.wrap(packet));
- int written = bcw.writeBuffer(packet.length);
- return written;
+ private int sendPacket(byte[] packet) throws IOException {
+ // This line if for tests...key should not be null
+ if (key == null)
+ return 0;
+ SocketChannel channel = (SocketChannel) key.channel();
+ BufferChannelWriter bcw = new BufferChannelWriter(channel, ByteBuffer
+ .wrap(packet));
+ int written = bcw.writeBuffer(packet.length);
+ if (written == -1) {
+ // Remove the endpoint from the list of clients
+ infoHolder.getEndpointManager().removeEndpoint(
+ new TCPEndpoint(channel));
+ }
+ return written;
}
}
Added: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java (added)
+++ geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java Mon Oct 9 16:51:03 2006
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import org.apache.geronimo.gcache.transports.Endpoint;
+
+public class TCPEndpoint extends Object implements Endpoint{
+
+ private SocketChannel channel;
+
+ public TCPEndpoint(SocketChannel channel) {
+ this.channel = channel;
+ }
+
+ //Convenience constructor to extract channel from the key
+ public TCPEndpoint(SelectionKey key) {
+ channel = (SocketChannel) key.channel();
+ }
+
+ public SocketChannel getChannel() {
+ return channel;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return channel.socket().getInetAddress().equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return channel.socket().getInetAddress().hashCode();
+ }
+
+
+}
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/main/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpoint.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java (added)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java Mon Oct 9 16:51:03 2006
@@ -0,0 +1,139 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import net.sf.ehcache.CacheManager;
+
+import org.apache.geronimo.gcache.CacheInfoHolder;
+import org.apache.geronimo.gcache.command.BaseCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.CommandTypes;
+import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
+import org.apache.geronimo.gcache.server.spi.ThreadPool;
+import org.apache.geronimo.gcache.transports.TransportServer;
+import org.apache.geronimo.gcache.util.BufferChannelReader;
+import org.apache.geronimo.gcache.util.BufferChannelWriter;
+import org.apache.geronimo.gcache.util.ByteArrayInputStream;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeSuite;
+
+public abstract class AbstractServer {
+ protected static final int port = 45678;
+ protected static final String host = "localhost";
+
+ protected TransportServer server = null;
+ protected ThreadPool pool;
+ protected SocketChannel clientChannel;
+ protected CacheInfoHolder info;
+
+ @BeforeClass
+ public void setUp() throws Exception{
+ pool = new DefaultThreadPoolImpl(10);
+ CacheManager mgr = CacheManager.create();
+ info = new CacheInfoHolder(mgr);
+ DefaultSelectionKeyProcessorFactory factory = new DefaultSelectionKeyProcessorFactory(info);
+
+ server = new TCPSocketTransportServer(host, port, pool, 2000, factory);
+
+ server.start();
+
+ //Create a client socket
+ clientChannel = SocketChannel.open();
+ clientChannel.connect(new InetSocketAddress(host, port));
+ }
+
+ @AfterClass(alwaysRun=true)
+ public void shutdown() throws Exception{
+ server.stop();
+ pool.shutdown();
+
+ clientChannel.close();
+ }
+
+ protected void sendCommand(BaseCommand command) throws IOException{
+
+ byte bytes[] = command.createPacket(true);
+ BufferChannelWriter bcw = new BufferChannelWriter(clientChannel, ByteBuffer.wrap(bytes));
+ int written = bcw.writeBuffer(bytes.length);
+ assert written == bytes.length;
+
+ }
+
+ protected Command readCommand() throws IOException{
+
+ ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13);
+ BufferChannelReader bcr = new BufferChannelReader(clientChannel, receiveHeader);
+ int read = bcr.readBuffer(receiveHeader.capacity());
+ assert read == Constants.MAGIC.length + 13;
+ receiveHeader.flip();
+
+ //Read the magic
+ byte magic[] = new byte[Constants.MAGIC.length];
+ receiveHeader.get(magic);
+
+ //Better match the Magic
+ assert Arrays.equals(Constants.MAGIC, magic);
+
+ //Get the command
+ byte commandIdentifier = receiveHeader.get();
+
+ //Get the checksum
+ long checksum = receiveHeader.getLong();
+
+ //Get the command length
+ int length = receiveHeader.getInt();
+
+ //pull the command
+ ByteBuffer commandBuffer = ByteBuffer.allocate(length);
+ bcr.reset(clientChannel, commandBuffer);
+ int count = bcr.readBuffer(length);
+ assert count == length;
+
+ //Calc a checksum
+ byte commandArray[] = commandBuffer.array();
+ Checksum calcChecksum = new CRC32();
+ calcChecksum.update(commandArray, 0, commandArray.length);
+ long newCheck = calcChecksum.getValue();
+
+ //Checksums match
+ assert newCheck == checksum;
+
+ //Now create the command
+ ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
+ ReadableByteChannel readChannel = Channels.newChannel(bias);
+
+ //Create the command and unmarshal the data
+ Command command = CommandTypes.createCommand(commandIdentifier);
+ command.readExternal(readChannel);
+
+ return command;
+
+ }
+}
Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/AbstractServer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java?view=auto&rev=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java (added)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java Mon Oct 9 16:51:03 2006
@@ -0,0 +1,67 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geronimo.gcache.transports.tcp;
+
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
+import org.apache.geronimo.gcache.command.BulkSendCommand;
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.GetCacheCommand;
+import org.apache.geronimo.gcache.transports.Endpoint;
+import org.testng.annotations.Test;
+
+public class TCPEndpointTest extends AbstractServer {
+
+ @Test
+ public void testJoinCluster() throws Exception{
+
+ assert 0 == info.getEndpointManager().size();
+
+ GetCacheCommand command = new GetCacheCommand();
+
+ command.setCacheName("Cache1");
+
+ //Send the packet
+ sendCommand(command);
+
+ //Now receive any data (it Should be a BulkSendCommand)
+ Command bulk = this.readCommand();
+
+ //Is the message the type we think it is?
+ assert bulk instanceof BulkSendCommand;
+
+ int commandsToFollow = ((BulkSendCommand)bulk).getNumberOfCommands();
+
+ //Nothing is in the Cache, so no commands should follow
+ assert commandsToFollow == 0;
+
+ //Should have one client
+ assert 1 == info.getEndpointManager().size();
+
+ Set<Endpoint> set = info.getEndpointManager().getEndpoints();
+ TCPEndpoint endpoint = (TCPEndpoint)set.iterator().next();
+ SocketChannel endpointChannel = endpoint.getChannel();
+
+ //Check that the socket addresses match (Remote on server == Local for client)
+ assert endpointChannel.socket().getRemoteSocketAddress().equals(clientChannel.socket().getLocalSocketAddress());
+
+
+ }
+
+}
Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
------------------------------------------------------------------------------
svn:keywords = Date Revision
Propchange: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TCPEndpointTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java
URL: http://svn.apache.org/viewvc/geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java?view=diff&rev=454556&r1=454555&r2=454556
==============================================================================
--- geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java (original)
+++ geronimo/sandbox/gcache/server/src/test/java/org/apache/geronimo/gcache/transports/tcp/TcpSocketServerTest.java Mon Oct 9 16:51:03 2006
@@ -16,50 +16,22 @@
*/
package org.apache.geronimo.gcache.transports.tcp;
-import net.sf.ehcache.CacheManager;
-import org.apache.geronimo.gcache.CacheInfoHolder;
-import org.apache.geronimo.gcache.util.BufferChannelReader;
-import org.apache.geronimo.gcache.command.PutSessionCommand;
-import org.apache.geronimo.gcache.command.BaseCommand;
-import org.apache.geronimo.gcache.command.Command;
-import org.apache.geronimo.gcache.command.CommandTypes;
-import org.apache.geronimo.gcache.command.MessageAckCommand;
-import org.apache.geronimo.gcache.server.impl.DefaultThreadPoolImpl;
-import org.apache.geronimo.gcache.server.spi.ThreadPool;
-import org.apache.geronimo.gcache.transports.TransportServer;
-import org.testng.annotations.AfterSuite;
-import org.testng.annotations.BeforeSuite;
-import org.testng.annotations.Test;
-
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.Channels;
import java.util.HashMap;
import java.util.Map;
-import java.util.Arrays;
-import java.util.zip.Checksum;
-import java.util.zip.CRC32;
-import java.io.ByteArrayInputStream;
-
-public class TcpSocketServerTest {
- private static final int port = 45678;
- private static final String host = "localhost";
+import org.apache.geronimo.gcache.command.Command;
+import org.apache.geronimo.gcache.command.MessageAckCommand;
+import org.apache.geronimo.gcache.command.PutSessionCommand;
+import org.testng.annotations.Test;
- TransportServer server = null;
- ThreadPool pool;
+public class TcpSocketServerTest extends AbstractServer {
@Test()
- public void sendSession() throws Exception {
-
- //Create a client socket
- SocketChannel channel = SocketChannel.open();
- channel.connect(new InetSocketAddress(host, port));
+ public void sendSession() throws Exception {
//Create a session
- Map session = new HashMap();
+ Map<String,String> session = new HashMap<String, String>();
session.put("key1","data1");
session.put("key2","data2");
session.put("key3","data3");
@@ -70,53 +42,10 @@
//Send the packet
ByteBuffer commandBuffer = ByteBuffer.wrap(command.createPacket(true));
- channel.write(commandBuffer);
+ clientChannel.write(commandBuffer);
//Now receive any data (it Should be a MessageAck)
- ByteBuffer receiveHeader = ByteBuffer.allocate(Constants.MAGIC.length + 13);
- BufferChannelReader bcr = new BufferChannelReader(channel, receiveHeader);
- int read = bcr.readBuffer(receiveHeader.capacity());
- assert read == Constants.MAGIC.length + 13;
- receiveHeader.flip();
-
- //Read the magic
- byte magic[] = new byte[Constants.MAGIC.length];
- receiveHeader.get(magic);
-
- //Better match the Magic
- assert Arrays.equals(Constants.MAGIC, magic);
-
- //Get the command
- byte commandIdentifier = receiveHeader.get();
-
- //Get the checksum
- long checksum = receiveHeader.getLong();
-
- //Get the command length
- int length = receiveHeader.getInt();
-
- //pull the command
- commandBuffer = ByteBuffer.allocate(length);
- bcr.reset(channel, commandBuffer);
- int count = bcr.readBuffer(length);
- assert count == length;
-
- //Calc a checksum
- byte commandArray[] = commandBuffer.array();
- Checksum calcChecksum = new CRC32();
- calcChecksum.update(commandArray, 0, commandArray.length);
- long newCheck = calcChecksum.getValue();
-
- //Checksums match
- assert newCheck == checksum;
-
- //Now create the command
- ByteArrayInputStream bias = new ByteArrayInputStream(commandBuffer.array());
- ReadableByteChannel readChannel = Channels.newChannel(bias);
-
- //Create the command and unmarshal the data
- Command ackCommand = CommandTypes.createCommand(commandIdentifier);
- ackCommand.readExternal(readChannel);
+ Command ackCommand = this.readCommand();
//Is the message the type we think it is?
assert ackCommand instanceof MessageAckCommand;
@@ -125,24 +54,5 @@
assert command.getCommandId() == ((MessageAckCommand)ackCommand).getMessageId();
}
-
-
- @BeforeSuite
- public void setUp() throws Exception{
- pool = new DefaultThreadPoolImpl(10);
- CacheManager mgr = CacheManager.create();
- CacheInfoHolder info = new CacheInfoHolder(mgr);
- DefaultSelectionKeyProcessorFactory factory = new DefaultSelectionKeyProcessorFactory(info);
-
- server = new TCPSocketTransportServer(host, port, pool, 2000, factory);
-
- server.start();
- }
-
- @AfterSuite(alwaysRun=true)
- public void shutdown() throws Exception{
- server.stop();
- pool.shutdown();
- }
}