You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tomcat.apache.org by fh...@apache.org on 2006/03/17 17:35:33 UTC

svn commit: r386668 - in /tomcat/container/tc5.5.x/modules/groupcom: ./ src/share/org/apache/catalina/tribes/tcp/bio/ src/share/org/apache/catalina/tribes/tipis/

Author: fhanik
Date: Fri Mar 17 08:35:32 2006
New Revision: 386668

URL: http://svn.apache.org/viewcvs?rev=386668&view=rev
Log:
Added new todo items,
Added a replicated hash map, all-to-all replication
Simplified BioSender, writeData isn't needed if it is a 2 line method

Added:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
Modified:
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
    tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
    tomcat/container/tc5.5.x/modules/groupcom/to-do.txt

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java?rev=386668&r1=386667&r2=386668&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioSender.java Fri Mar 17 08:35:32 2006
@@ -267,22 +267,12 @@
         keepalive();
         if ( reconnect ) closeSocket();
         if (!isConnected()) openSocket();
-        writeData(data);
-    }
-    
-    /**
-     * Sent real cluster Message to socket stream
-     * FIXME send compress
-     * @param data
-     * @throws IOException
-     * @since 5.5.10
-     */
-    protected  void writeData(byte[] data) throws IOException { 
         soOut.write(data);
         soOut.flush();
         if (getWaitForAck()) waitForAck();
-    }
 
+    }
+    
     /**
      * Wait for Acknowledgement from other server
      * FIXME Please, not wait only for three charcters, better control that the wait ack message is correct.

Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java?rev=386668&r1=386667&r2=386668&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/LazyReplicatedMap.java Fri Mar 17 08:35:32 2006
@@ -15,19 +15,11 @@
  */
 package org.apache.catalina.tribes.tipis;
 
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
@@ -37,9 +29,6 @@
 import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.MembershipListener;
-import org.apache.catalina.tribes.io.DirectByteArrayOutputStream;
-import org.apache.catalina.tribes.io.XByteBuffer;
-import org.apache.catalina.tribes.mcast.MemberImpl;
 
 /**
  * A smart implementation of a stateful replicated map. uses primary/secondary backup strategy. 

Added: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java?rev=386668&view=auto
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java (added)
+++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/ReplicatedMap.java Fri Mar 17 08:35:32 2006
@@ -0,0 +1,281 @@
+/*
+ * Copyright 1999,2004-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.tipis;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.MembershipListener;
+
+/**
+ * All-to-all replication for a hash map implementation. Each node in the cluster will carry an identical 
+ * copy of the map.<br><br>
+ * This map implementation doesn't have a background thread running to replicate changes.
+ * If you do have changes without invoking put/remove then you need to invoke one of the following methods:
+ * <ul>
+ * <li><code>replicate(Object,boolean)</code> - replicates only the object that belongs to the key</li>
+ * <li><code>replicate(boolean)</code> - Scans the entire map for changes and replicates data</li>
+ *  </ul>
+ * the <code>boolean</code> value in the <code>replicate</code> method used to decide
+ * whether to only replicate objects that implement the <code>ReplicatedMapEntry</code> interface
+ * or to replicate all objects. If an object doesn't implement the <code>ReplicatedMapEntry</code> interface
+ * each time the object gets replicated the entire object gets serialized, hence a call to <code>replicate(true)</code>
+ * will replicate all objects in this map that are using this node as primary.
+ *
+ * <br><br><b>REMBER TO CALL <code>breakdown()</code> or <code>finalize()</code> when you are done with the map to
+ * avoid memory leaks.<br><br>
+ * @todo implement periodic sync/transfer thread
+ * @author Filip Hanik
+ * @version 1.0
+ */
+public class ReplicatedMap extends AbstractReplicatedMap implements RpcCallback, ChannelListener, MembershipListener {
+
+    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ReplicatedMap.class);
+
+//------------------------------------------------------------------------------
+//              CONSTRUCTORS / DESTRUCTORS
+//------------------------------------------------------------------------------
+    /**
+     * Creates a new map
+     * @param channel The channel to use for communication
+     * @param timeout long - timeout for RPC messags
+     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+     * @param initialCapacity int - the size of this map, see HashMap
+     * @param loadFactor float - load factor, see HashMap
+     */
+    public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity,
+                             float loadFactor) {
+        super(channel, timeout, mapContextName, initialCapacity, loadFactor);
+    }
+
+    /**
+     * Creates a new map
+     * @param channel The channel to use for communication
+     * @param timeout long - timeout for RPC messags
+     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+     * @param initialCapacity int - the size of this map, see HashMap
+     */
+    public ReplicatedMap(Channel channel, long timeout, String mapContextName, int initialCapacity) {
+        super(channel, timeout, mapContextName, initialCapacity);
+    }
+
+    /**
+     * Creates a new map
+     * @param channel The channel to use for communication
+     * @param timeout long - timeout for RPC messags
+     * @param mapContextName String - unique name for this map, to allow multiple maps per channel
+     */
+    public ReplicatedMap(Channel channel, long timeout, String mapContextName) {
+        super(channel, timeout, mapContextName);
+    }
+
+//------------------------------------------------------------------------------
+//              METHODS TO OVERRIDE
+//------------------------------------------------------------------------------
+    /**
+     * publish info about a map pair (key/value) to other nodes in the cluster
+     * @param key Object
+     * @param value Object
+     * @return Member - the backup node
+     * @throws ChannelException
+     */
+    protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
+        //select a backup node
+        Member[] backup = getMapMembers();
+
+        if (backup == null || backup.length == 0) return null;
+
+        //publish the data out to all nodes
+        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_BACKUP, false,
+                                        (Serializable) key, null, null, backup);
+
+        getChannel().send(getMapMembers(), msg);
+
+        return backup;
+    }
+
+    public Object get(Object key) {
+        MapEntry entry = (MapEntry)super.get(key);
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
+    }
+
+    /**
+     * Returns true if the key has an entry in the map.
+     * The entry can be a proxy or a backup entry, invoking <code>get(key)</code>
+     * will make this entry primary for the group
+     * @param key Object
+     * @return boolean
+     */
+    public boolean containsKey(Object key) {
+        return super.containsKey(key);
+    }
+
+    public Object put(Object key, Object value) {
+        if (! (key instanceof Serializable))throw new IllegalArgumentException("Key is not serializable:" +key.getClass().getName());
+        if (value == null)return remove(key);
+        if (! (value instanceof Serializable))throw new IllegalArgumentException("Value is not serializable:" +value.getClass().getName());
+
+        MapEntry entry = new MapEntry( (Serializable) key, (Serializable) value);
+        entry.setBackup(false);
+        entry.setProxy(false);
+
+        Object old = null;
+
+        //make sure that any old values get removed
+        if (containsKey(key)) old = remove(key);
+        try {
+            Member[] backup = publishEntryInfo(key, value);
+            entry.setBackupNodes(backup);
+        } catch (ChannelException x) {
+            log.error("Unable to replicate out data for a LazyReplicatedMap.put operation", x);
+        }
+        super.put(key, entry);
+        return old;
+    }
+
+    /**
+     * Copies all values from one map to this instance
+     * @param m Map
+     * @todo send one bulk message
+     */
+    public void putAll(Map m) {
+        Iterator i = m.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry entry = (Map.Entry) i.next();
+            put(entry.getKey(), entry.getValue());
+        }
+    }
+
+    /**
+     * Removes an object from this map, it will also remove it from
+     *
+     * @param key Object
+     * @return Object
+     */
+    public Object remove(Object key) {
+        MapEntry entry = (MapEntry)super.remove(key);
+        MapMessage msg = new MapMessage(getMapContextName(), MapMessage.MSG_REMOVE, false, (Serializable) key, null, null, null);
+        try {
+            getChannel().send(getMapMembers(), msg);
+        } catch (ChannelException x) {
+            log.error("Unable to replicate out data for a LazyReplicatedMap.remove operation", x);
+        }
+        return entry != null ? entry.getValue() : null;
+    }
+
+    public void clear() {
+        //only delete active keys
+        Iterator keys = keySet().iterator();
+        while (keys.hasNext()) remove(keys.next());
+    }
+
+    public boolean containsValue(Object value) {
+        if (value == null) {
+            return super.containsValue(value);
+        } else {
+            Iterator i = super.entrySet().iterator();
+            while (i.hasNext()) {
+                Map.Entry e = (Map.Entry) i.next();
+                MapEntry entry = (MapEntry) e.getValue();
+                if ( value.equals(entry.getValue()) ) return true;
+            } //while
+            return false;
+        } //end if
+    }
+
+    public Object clone() {
+        throw new UnsupportedOperationException("This operation is not valid on a replicated map");
+    }
+
+    /**
+     * Returns the entire contents of the map
+     * Map.Entry.getValue() will return a LazyReplicatedMap.MapEntry object containing all the information
+     * about the object.
+     * @return Set
+     */
+    public Set entrySetFull() {
+        return super.entrySet();
+    }
+
+    public Set keySetFull() {
+        return super.keySet();
+    }
+
+    public Set entrySet() {
+        LinkedHashSet set = new LinkedHashSet(super.size());
+        Iterator i = super.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry e = (Map.Entry) i.next();
+            MapEntry entry = (MapEntry) e.getValue();
+            if (entry.isPrimary()) set.add(entry.getValue());
+        }
+        return Collections.unmodifiableSet(set);
+    }
+
+    public Set keySet() {
+        //todo implement
+        //should only return keys where this is active.
+        LinkedHashSet set = new LinkedHashSet(super.size());
+        Iterator i = super.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry e = (Map.Entry) i.next();
+            MapEntry entry = (MapEntry) e.getValue();
+            if (entry.isPrimary()) set.add(entry.getKey());
+        }
+        return Collections.unmodifiableSet(set);
+    }
+
+    public int sizeFull() {
+        return super.size();
+    }
+
+    public int size() {
+        return super.size();
+    }
+
+    protected boolean removeEldestEntry(Map.Entry eldest) {
+        return false;
+    }
+
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    public Collection values() {
+        ArrayList values = new ArrayList(super.size());
+        Iterator i = super.entrySet().iterator();
+        while (i.hasNext()) {
+            Map.Entry e = (Map.Entry) i.next();
+            MapEntry entry = (MapEntry) e.getValue();
+            if (entry.isPrimary()) values.add(entry.getValue());
+        }
+        return Collections.unmodifiableCollection(values);
+    }
+
+}
\ No newline at end of file

Modified: tomcat/container/tc5.5.x/modules/groupcom/to-do.txt
URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/to-do.txt?rev=386668&r1=386667&r2=386668&view=diff
==============================================================================
--- tomcat/container/tc5.5.x/modules/groupcom/to-do.txt (original)
+++ tomcat/container/tc5.5.x/modules/groupcom/to-do.txt Fri Mar 17 08:35:32 2006
@@ -8,12 +8,19 @@
 Code Tasks:
 ===========================================
 
-18. Implement SSL encryption over message transfers
+21. Implement a WAN membership layer, using a WANMbrInterceptor and a 
+    WAN Router/Forwarder (Tipi on top of a ManagedChannel)
+
+20. Implement a TCP membership interceptor, for guaranteed functionality, not just discovery
+
+19. Implement a hardcoded tcp membership
+
+18. Implement SSL encryption over message transfers, BIO and NIO
 
 8. WaitForCompletionInterceptor - waits for the message to get processed by all receivers before returning
-  (This is useful when synchronized=false and waitForAck=false, to improve
-parallel processing, but you want to have all messages sent in parallel and
-don't return until all have been processed on the remote end.)
+   (This is useful when synchronized=false and waitForAck=false, to improve
+   parallel processing, but you want to have all messages sent in parallel and
+   don't return until all have been processed on the remote end.)
 
 9. CoordinatorInterceptor - manages the selection of a cluster coordinator
    just had a brilliant idea, if GroupChannel keeps its own view of members,



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@tomcat.apache.org
For additional commands, e-mail: dev-help@tomcat.apache.org