You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by an...@apache.org on 2010/03/09 21:16:01 UTC

svn commit: r921095 - in /tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src: main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java

Author: antelder
Date: Tue Mar  9 20:16:01 2010
New Revision: 921095

URL: http://svn.apache.org/viewvc?rev=921095&view=rev
Log:
Update Hazelcast endpoint registry to remove endpoints from a runtime that no longer exists

Modified:
    tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
    tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java

Modified: tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java?rev=921095&r1=921094&r2=921095&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java Tue Mar  9 20:16:01 2010
@@ -21,6 +21,7 @@ package org.apache.tuscany.sca.endpoint.
 
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.tuscany.sca.runtime.Ba
 import org.apache.tuscany.sca.runtime.DomainRegistryURI;
 import org.apache.tuscany.sca.runtime.EndpointRegistry;
 import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.oasisopen.sca.ServiceRuntimeException;
 
 import com.hazelcast.config.Config;
 import com.hazelcast.config.NearCacheConfig;
@@ -42,9 +44,13 @@ import com.hazelcast.core.EntryEvent;
 import com.hazelcast.core.EntryListener;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.ILock;
 import com.hazelcast.core.IMap;
+import com.hazelcast.core.Member;
 import com.hazelcast.core.MembershipEvent;
 import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.MultiMap;
+import com.hazelcast.core.Transaction;
 import com.hazelcast.nio.Address;
 
 /**
@@ -56,8 +62,9 @@ public class HazelcastEndpointRegistry e
     protected DomainRegistryURI configURI;
 
     private HazelcastInstance hazelcastInstance;
-    protected Map<Object, Object> map;
+    protected Map<Object, Object> endpointMap;
     private Map<String, Endpoint> localEndpoints = new HashMap<String, Endpoint>();
+    private MultiMap<String, String> endpointOwners;
 
     public HazelcastEndpointRegistry(ExtensionPointRegistry registry,
                                      Map<String, String> attributes,
@@ -68,16 +75,19 @@ public class HazelcastEndpointRegistry e
     }
 
     public void start() {
-        if (map != null) {
+        if (endpointMap != null) {
             throw new IllegalStateException("The registry has already been started");
         }
         if (configURI.toString().startsWith("tuscany:vm:")) {
-            map = new HashMap<Object, Object>();
+            endpointMap = new HashMap<Object, Object>();
         } else {
             initHazelcastInstance();
             IMap imap = hazelcastInstance.getMap(configURI.getDomainName() + "/Endpoints");
             imap.addEntryListener(this, true);
-            map = imap;
+            endpointMap = imap;
+            
+            endpointOwners = hazelcastInstance.getMultiMap(configURI.getDomainName() + "/EndpointOwners");
+
             hazelcastInstance.getCluster().addMembershipListener(this);
         }
     }
@@ -86,7 +96,8 @@ public class HazelcastEndpointRegistry e
         if (hazelcastInstance != null) {
             hazelcastInstance.shutdown();
             hazelcastInstance = null;
-            map = null;
+            endpointMap = null;
+            endpointOwners = null;
         }
     }
 
@@ -136,14 +147,25 @@ public class HazelcastEndpointRegistry e
     }
 
     public void addEndpoint(Endpoint endpoint) {
-        map.put(endpoint.getURI(), endpoint);
-        localEndpoints.put(endpoint.getURI(), endpoint);
+        String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+        String endpointURI = endpoint.getURI();
+        Transaction txn = hazelcastInstance.getTransaction();
+        txn.begin();
+        try {
+            endpointMap.put(endpointURI, endpoint);
+            endpointOwners.put(localMemberAddr, endpointURI);
+            txn.commit();
+        } catch (Throwable e) {
+            txn.rollback();
+            throw new ServiceRuntimeException(e);
+        }
+        localEndpoints.put(endpointURI, endpoint);
         logger.info("Add endpoint - " + endpoint);
     }
 
     public List<Endpoint> findEndpoint(String uri) {
         List<Endpoint> foundEndpoints = new ArrayList<Endpoint>();
-        for (Object v : map.values()) {
+        for (Object v : endpointMap.values()) {
             Endpoint endpoint = (Endpoint)v;
             logger.fine("Matching against - " + endpoint);
             if (matches(uri, endpoint.getURI())) {
@@ -170,16 +192,27 @@ public class HazelcastEndpointRegistry e
     }
 
     public Endpoint getEndpoint(String uri) {
-        return (Endpoint)map.get(uri);
+        return (Endpoint)endpointMap.get(uri);
     }
 
     public List<Endpoint> getEndpoints() {
-        return new ArrayList(map.values());
+        return new ArrayList(endpointMap.values());
     }
 
     public void removeEndpoint(Endpoint endpoint) {
-        map.remove(endpoint.getURI());
-        localEndpoints.remove(endpoint.getURI());
+        String localMemberAddr = hazelcastInstance.getCluster().getLocalMember().getInetSocketAddress().toString();
+        String endpointURI = endpoint.getURI();
+        Transaction txn = hazelcastInstance.getTransaction();
+        txn.begin();
+        try {
+            endpointMap.remove(endpointURI);
+            endpointOwners.remove(localMemberAddr, endpointURI);
+            txn.commit();
+        } catch (Throwable e) {
+            txn.rollback();
+            throw new ServiceRuntimeException(e);
+        }
+        localEndpoints.remove(endpointURI);
         logger.info("Removed endpoint - " + endpoint);
     }
 
@@ -229,6 +262,29 @@ public class HazelcastEndpointRegistry e
     }
 
     public void memberRemoved(MembershipEvent event) {
+        try {
+            String memberAddr = event.getMember().getInetSocketAddress().toString();
+            if (endpointOwners.containsKey(memberAddr)) {
+                ILock lock = hazelcastInstance.getLock("EndpointOwners/" + memberAddr);
+                lock.lock();
+                try {
+                    if (endpointOwners.containsKey(memberAddr)) {
+                        Collection<String> keys = endpointOwners.remove(memberAddr);
+                        for (Object k : keys) {
+                            endpointMap.remove(k);
+                        }
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            }
+        } catch (Exception e) {
+            if (e.getCause() != null && e.getCause().getCause() != null) {
+                // ignore hazelcast already shutdown exception
+                if (!"Hazelcast Instance is not active!".equals(e.getCause().getCause().getMessage())) {
+                    throw new ServiceRuntimeException(e);
+                }
+            }
+        }
     }
-
 }

Modified: tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java?rev=921095&r1=921094&r2=921095&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java Tue Mar  9 20:16:01 2010
@@ -19,10 +19,6 @@
 
 package org.apache.tuscany.sca.endpoint.hazelcast;
 
-import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.tuscany.sca.assembly.AssemblyFactory;
 import org.apache.tuscany.sca.assembly.Binding;
 import org.apache.tuscany.sca.assembly.Component;
@@ -31,18 +27,12 @@ import org.apache.tuscany.sca.assembly.S
 import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
 import org.apache.tuscany.sca.core.ExtensionPointRegistry;
 import org.apache.tuscany.sca.core.FactoryExtensionPoint;
-import org.apache.tuscany.sca.runtime.EndpointListener;
 import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
-import com.hazelcast.core.IMap;
-
-// Ignore so its not run in the build yet till its working
-@Ignore("Hazelcast doesn't support the map entry management by members")
-public class MultiRegTestCase implements EndpointListener {
+public class MultiRegTestCase {
     private static ExtensionPointRegistry extensionPoints;
     private static AssemblyFactory assemblyFactory;
     private static SCABindingFactory scaBindingFactory;
@@ -57,97 +47,69 @@ public class MultiRegTestCase implements
 
     @Test
     public void testReplication() throws Exception {
-        RuntimeEndpoint ep1 = createEndpoint("ep1uri");
 
-        String host = InetAddress.getLocalHost().getHostAddress();
-        String bind = null; // "9.65.158.31";
-        String port1 = "8085";
-        String port2 = "8086";
-        String port3 = "8087";
-        String range = "1";
-
-        Map<String, String> attrs1 = new HashMap<String, String>();
-        // attrs1.put("nomcast", "true");
-        attrs1.put("bind", bind);
-        attrs1.put("receiverPort", port1);
-        attrs1.put("receiverAutoBind", range);
-        // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
-        HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, attrs1, "tuscany:foo", "bar");
-        reg1.addListener(this);
+        System.out.println("Starting reg1");
+        HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9876&multicast=off", "bar");
         reg1.start();
 
-        Map<String, String> attrs2 = new HashMap<String, String>();
-        // attrs2.put("nomcast", "true");
-        attrs1.put("bind", bind);
-        attrs2.put("receiverPort", port2);
-        attrs2.put("receiverAutoBind", range);
-        // attrs2.put("routes", host + ":"+port1);
-        HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, attrs2, "tuscany:foo", "bar");
-        reg2.addListener(this);
+        System.out.println("Adding ep1");
+        RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+        ep1.bind(extensionPoints, reg1);
+        reg1.addEndpoint(ep1);
+
+        System.out.println("Starting reg3");
+        HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9877&multicast=off&remotes=127.0.0.1:9876", "bar");
         reg2.start();
 
-        Map<String, String> attrs3 = new HashMap<String, String>();
-        // attrs3.put("nomcast", "true");
-        attrs1.put("bind", bind);
-        attrs3.put("receiverPort", port3);
-        attrs3.put("receiverAutoBind", range);
-        // attrs3.put("routes", host + ":"+port1);
-        HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, attrs3, "tuscany:foo", "bar");
-        reg3.addListener(this);
+        System.out.println("Starting reg2");
+        HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, null, "tuscany:foo?listen=127.0.0.1:9878&multicast=off&remotes=127.0.0.1:9877", "bar");
         reg3.start();
 
-        ep1.bind(extensionPoints, reg1);
-        reg1.addEndpoint(ep1);
         assertExists(reg1, "ep1uri");
         assertExists(reg2, "ep1uri");
         assertExists(reg3, "ep1uri");
 
+        System.out.println("Adding ep2");
         RuntimeEndpoint ep2 = createEndpoint("ep2uri");
         ep2.bind(extensionPoints, reg2);
         reg2.addEndpoint(ep2);
+
         assertExists(reg2, "ep2uri");
         assertExists(reg1, "ep2uri");
         assertExists(reg3, "ep2uri");
         
-        System.out.println(((IMap)reg1.map).localKeySet().size());
-        System.out.println(((IMap)reg2.map).localKeySet().size());
-        System.out.println(((IMap)reg3.map).localKeySet().size());
-
+        System.out.println("Stopping reg1");
         reg1.stop();
-        Thread.sleep(6000);
+        System.out.println("Stopped reg1");
+        Thread.sleep(500);
+
         Assert.assertNull(reg2.getEndpoint("ep1uri"));
         Assert.assertNull(reg3.getEndpoint("ep1uri"));
 
-        System.out.println(((IMap)reg2.map).localKeySet().size());
-        System.out.println(((IMap)reg3.map).localKeySet().size());
-
         assertExists(reg2, "ep2uri");
         assertExists(reg3, "ep2uri");
         
+        System.out.println("Starting reg1");
         reg1.start();
         ep1.bind(extensionPoints, reg1);
+
+        System.out.println("adding ep1");
         reg1.addEndpoint(ep1);
         assertExists(reg1, "ep1uri");
         assertExists(reg2, "ep1uri");
         assertExists(reg3, "ep1uri");
         
+        System.out.println("Stopping reg1");
         reg1.stop();
+        System.out.println("Stopping reg2");
         reg2.stop();
+        System.out.println("Stopping reg3");
         reg3.stop();
-        System.out.println(); // closed
+        System.out.println("done");
     }
 
     private Endpoint assertExists(HazelcastEndpointRegistry reg, String uri) throws InterruptedException {
-        Endpoint ep = null;
-        int count = 0;
-        while (ep == null && count < 15) {
-            ep = reg.getEndpoint(uri);
-            if (ep == null) {
-                Thread.sleep(1000);
-                System.out.println(reg + ": tries=" + count);
-            }
-            count++;
-        }
+        Endpoint ep = reg.getEndpoint(uri);
         Assert.assertNotNull(ep);
         Assert.assertEquals(uri, ep.getURI());
         return ep;
@@ -164,20 +126,4 @@ public class MultiRegTestCase implements
         return ep;
     }
     
-    private void print(String prefix, Endpoint ep) {
-        System.out.println(prefix + ": "+ep);
-    }
-
-    public void endpointAdded(Endpoint endpoint) {
-        print("Added", endpoint);
-    }
-
-    public void endpointRemoved(Endpoint endpoint) {
-        print("Removed", endpoint);
-    }
-
-    public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
-        print("Updated", newEndpoint);
-    }
-
 }