You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/17 17:35:33 UTC
svn commit: r386668 - in /tomcat/container/tc5.5.x/modules/groupcom: ./
src/share/org/apache/catalina/tribes/tcp/bio/
src/share/org/apache/catalina/tribes/tipis/
Author: fhanik
Date: Fri Mar 17 08:35:32 2006
New Revision: 386668
URL: http://svn.apache.org/viewcvs?rev=386668&view=rev
Log:
Added new todo items,
Added a replicated hash map, all-to-all replication
Simplified BioSender, writeData isn't needed if it is a 2 line method
Added:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386668&r1=386667&r2=386668&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Fri Mar 17 08:35:32 2006
@@ -267,22 +267,12 @@
keepalive();
if ( reconnect ) closeSocket();
if (!isConnected()) openSocket();
- writeData(data);
- }
-
- /**
- * Sent real cluster Message to socket stream
- * FIXME send compress
- * @param data
- * @throws IOException
- * @since 5.5.10
- */
- protected void writeData(byte[] data) throws IOException {
soOut.write(data);
soOut.flush();
if (getWaitForAck()) waitForAck();
- }
+ }
+
/**
* Wait for Acknowledgement from other server
* FIXME Please, not wait only for three charcters, better control that the wait ack message is correct.
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=386668&r1=386667&r2=386668&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Mar 17 08:35:32 2006
@@ -15,19 +15,11 @@
*/
package org.apache.catalina.tribes.tipis;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -37,9 +29,6 @@
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.mcast.MemberImpl;
/**
* A smart implementation of a stateful replicated map. uses primary/secondary backup strategy.
Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=386668&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Fri Mar 17 08:35:32 2006
@@ -0,0 +1,281 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+
+/**
+ * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical
+ * copy of the map.<br><br>
+ * This map implementation doesn't have a background thread running to replicate changes.
+ * If you do have changes without invoking put/remove then you need to invoke one of the following methods:
+ * <ul>
+ * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li>
+ * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li>
+ * </ul>
+ * the <code>boolean</code> value in the <code>replicate</code> method used to decide
+ * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface
+ * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface
+ * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code>
+ * will replicate all objects in this map that are using this node as primary.
+ *
+ * <br><br><b>REMBER TO CALL <code>breakdown()</code> or <code>finalize()</code> when you are done with the map to
+ * avoid memory leaks.<br><br>
+ * @todo implement periodic sync/transfer thread
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener {
+
+ protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReplicatedMap.class);
+
+//------------------------------------------------------------------------------
+// CONSTRUCTORS / DESTRUCTORS
+//------------------------------------------------------------------------------
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ * @param loadFactor float - load factor, see HashMap
+ */
+ public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity,
+ float loadFactor) {
+ super(channel, timeout, mapContextName, initialCapacity, loadFactor);
+ }
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ * @param initialCapacity int - the size of this map, see HashMap
+ */
+ public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
+ super(channel, timeout, mapContextName, initialCapacity);
+ }
+
+ /**
+ * Creates a new map
+ * @param channel The channel to use for communication
+ * @param timeout long - timeout for RPC messags
+ * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+ */
+ public ReplicatedMap(Channel channel, long timeout, String mapContextName) {
+ super(channel, timeout, mapContextName);
+ }
+
+//------------------------------------------------------------------------------
+// METHODS TO OVERRIDE
+//------------------------------------------------------------------------------
+ /**
+ * publish info about a map pair (key/value) to other nodes in the cluster
+ * @param key Object
+ * @param value Object
+ * @return Member - the backup node
+ * @throws ChannelException
+ */
+ protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
+ //select a backup node
+ Member[] backup = getMapMembers();
+
+ if (backup == null || backup.length == 0) return null;
+
+ //publish the data out to all nodes
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
+ (Serializable) key, null, null, backup);
+
+ getChannel().send(getMapMembers(), msg);
+
+ return backup;
+ }
+
+ public Object get(Object key) {
+ MapEntry entry = (MapEntry)super.get(key);
+ if (entry == null) {
+ return null;
+ }
+ return entry.getValue();
+ }
+
+ /**
+ * Returns true if the key has an entry in the map.
+ * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
+ * will make this entry primary for the group
+ * @param key Object
+ * @return boolean
+ */
+ public boolean containsKey(Object key) {
+ return super.containsKey(key);
+ }
+
+ public Object put(Object key, Object value) {
+ if (! (key instanceof Serializable))throw new IllegalArgumentException("Key is not serializable:" +key.getClass().getName());
+ if (value == null)return remove(key);
+ if (! (value instanceof Serializable))throw new IllegalArgumentException("Value is not serializable:" +value.getClass().getName());
+
+ MapEntry entry = new MapEntry( (Serializable) key, (Serializable) value);
+ entry.setBackup(false);
+ entry.setProxy(false);
+
+ Object old = null;
+
+ //make sure that any old values get removed
+ if (containsKey(key)) old = remove(key);
+ try {
+ Member[] backup = publishEntryInfo(key, value);
+ entry.setBackupNodes(backup);
+ } catch (ChannelException x) {
+ log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
+ }
+ super.put(key, entry);
+ return old;
+ }
+
+ /**
+ * Copies all values from one map to this instance
+ * @param m Map
+ * @todo send one bulk message
+ */
+ public void putAll(Map m) {
+ Iterator i = m.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry entry = (Map.Entry) i.next();
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
+ * Removes an object from this map, it will also remove it from
+ *
+ * @param key Object
+ * @return Object
+ */
+ public Object remove(Object key) {
+ MapEntry entry = (MapEntry)super.remove(key);
+ MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+ try {
+ getChannel().send(getMapMembers(), msg);
+ } catch (ChannelException x) {
+ log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x);
+ }
+ return entry != null ? entry.getValue() : null;
+ }
+
+ public void clear() {
+ //only delete active keys
+ Iterator keys = keySet().iterator();
+ while (keys.hasNext()) remove(keys.next());
+ }
+
+ public boolean containsValue(Object value) {
+ if (value == null) {
+ return super.containsValue(value);
+ } else {
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if ( value.equals(entry.getValue()) ) return true;
+ } //while
+ return false;
+ } //end if
+ }
+
+ public Object clone() {
+ throw new UnsupportedOperationException("This operation is not valid on a replicated map");
+ }
+
+ /**
+ * Returns the entire contents of the map
+ * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
+ * about the object.
+ * @return Set
+ */
+ public Set entrySetFull() {
+ return super.entrySet();
+ }
+
+ public Set keySetFull() {
+ return super.keySet();
+ }
+
+ public Set entrySet() {
+ LinkedHashSet set = new LinkedHashSet(super.size());
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary()) set.add(entry.getValue());
+ }
+ return Collections.unmodifiableSet(set);
+ }
+
+ public Set keySet() {
+ //todo implement
+ //should only return keys where this is active.
+ LinkedHashSet set = new LinkedHashSet(super.size());
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary()) set.add(entry.getKey());
+ }
+ return Collections.unmodifiableSet(set);
+ }
+
+ public int sizeFull() {
+ return super.size();
+ }
+
+ public int size() {
+ return super.size();
+ }
+
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return false;
+ }
+
+ public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ public Collection values() {
+ ArrayList values = new ArrayList(super.size());
+ Iterator i = super.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry e = (Map.Entry) i.next();
+ MapEntry entry = (MapEntry) e.getValue();
+ if (entry.isPrimary()) values.add(entry.getValue());
+ }
+ return Collections.unmodifiableCollection(values);
+ }
+
+}
\ No newline at end of file
Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=386668&r1=386667&r2=386668&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri Mar 17 08:35:32 2006
@@ -8,12 +8,19 @@
Code Tasks:
===========================================
-18. Implement SSL encryption over message transfers
+21. Implement a WAN membership layer, using a WANMbrInterceptor and a
+ WAN Router/Forwarder (Tipi on top of a ManagedChannel)
+
+20. Implement a TCP membership interceptor, for guaranteed functionality, not just discovery
+
+19. Implement a hardcoded tcp membership
+
+18. Implement SSL encryption over message transfers, BIO and NIO
8. WaitForCompletionInterceptor - waits for the message to get processed by all receivers before returning
- (This is useful when synchronized=false and waitForAck=false, to improve
-parallel processing, but you want to have all messages sent in parallel and
-don't return until all have been processed on the remote end.)
+ (This is useful when synchronized=false and waitForAck=false, to improve
+ parallel processing, but you want to have all messages sent in parallel and
+ don't return until all have been processed on the remote end.)
9. CoordinatorInterceptor - manages the selection of a cluster coordinator
just had a brilliant idea, if GroupChannel keeps its own view of members,
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org