You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by rf...@apache.org on 2010/01/19 06:37:39 UTC

svn commit: r900661 - in /tuscany/sca-java-2.x/trunk/modules: core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/ deployment/src/main/java/org/apache/tuscany/sca/deployment/ deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/ end...

Author: rfeng
Date: Tue Jan 19 05:37:38 2010
New Revision: 900661

URL: http://svn.apache.org/viewvc?rev=900661&view=rev
Log:
Expose system definitions from the deployer
Add the removal of entries when the member leaves the group
Add listeners to the hazelcastInstance

Added:
    tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java   (with props)
Modified:
    tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
    tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java
    tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java
    tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
    tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
    tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
    tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
    tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
    tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java
    tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java
    tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java

Modified: tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java Tue Jan 19 05:37:38 2010
@@ -508,8 +508,12 @@
                     bind(compositeContext);
                 }
             }
-            RuntimeEndpointImpl ep = (RuntimeEndpointImpl)serializer.readEndpoint(xml);
-            copyFrom(ep);
+            if (serializer != null) {
+                RuntimeEndpointImpl ep = (RuntimeEndpointImpl)serializer.readEndpoint(xml);
+                copyFrom(ep);
+            } else {
+                // FIXME: [rfeng] What should we do here?
+            }
         }
         super.resolve();
     }

Modified: tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/Deployer.java Tue Jan 19 05:37:38 2010
@@ -42,6 +42,7 @@
 import org.apache.tuscany.sca.contribution.processor.ProcessorContext;
 import org.apache.tuscany.sca.core.ExtensionPointRegistry;
 import org.apache.tuscany.sca.core.LifeCycleListener;
+import org.apache.tuscany.sca.definitions.Definitions;
 import org.apache.tuscany.sca.monitor.Monitor;
 
 /**
@@ -186,14 +187,8 @@
      */
     ExtensionPointRegistry getExtensionPointRegistry();
     
-    /* 
-     * @see org.apache.tuscany.sca.core.LifeCycleListener#start()
-     */
-    void start();
-
-    /* 
-     * @see org.apache.tuscany.sca.core.LifeCycleListener#stop()
+    /**
+     * Get the system definitions   
      */
-    void stop();
-    
+    Definitions getSystemDefinitions();  
 }

Modified: tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/deployment/src/main/java/org/apache/tuscany/sca/deployment/impl/DeployerImpl.java Tue Jan 19 05:37:38 2010
@@ -50,8 +50,8 @@
 import org.apache.tuscany.sca.contribution.DefaultImport;
 import org.apache.tuscany.sca.contribution.Export;
 import org.apache.tuscany.sca.contribution.Import;
-import org.apache.tuscany.sca.contribution.namespace.NamespaceImport;
 import org.apache.tuscany.sca.contribution.java.JavaImport;
+import org.apache.tuscany.sca.contribution.namespace.NamespaceImport;
 import org.apache.tuscany.sca.contribution.processor.ContributionReadException;
 import org.apache.tuscany.sca.contribution.processor.ContributionResolveException;
 import org.apache.tuscany.sca.contribution.processor.ContributionWriteException;
@@ -568,4 +568,9 @@
         this.schemaValidationEnabled = schemaValidationEnabled;
     }
 
+    public Definitions getSystemDefinitions() {
+        init();
+        return systemDefinitions;
+    }
+
 }

Modified: tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/main/java/org/apache/tuscany/sca/endpoint/hazelcast/HazelcastEndpointRegistry.java Tue Jan 19 05:37:38 2010
@@ -38,14 +38,19 @@
 import com.hazelcast.config.Config;
 import com.hazelcast.config.TcpIpConfig;
 import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.EntryEvent;
+import com.hazelcast.core.EntryListener;
 import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
 import com.hazelcast.nio.Address;
 
 /**
  * An EndpointRegistry using a Hazelcast
  */
-public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleListener {
+public class HazelcastEndpointRegistry implements EndpointRegistry, LifeCycleListener, EntryListener<String, Endpoint>, MembershipListener {
     private final static Logger logger = Logger.getLogger(HazelcastEndpointRegistry.class.getName());
 
     private List<EndpointReference> endpointreferences = new CopyOnWriteArrayList<EndpointReference>();
@@ -54,14 +59,14 @@
     private ExtensionPointRegistry registry;
     private ConfigURI configURI;
 
-    private HazelcastInstance hazelcastInstance;
-    private Map<Object, Object> map;
+    HazelcastInstance hazelcastInstance;
+    Map<Object, Object> map;
     private List<String> localEndpoints = new ArrayList<String>();;
 
     public HazelcastEndpointRegistry(ExtensionPointRegistry registry,
-                                      Map<String, String> attributes,
-                                      String domainRegistryURI,
-                                      String domainURI) {
+                                     Map<String, String> attributes,
+                                     String domainRegistryURI,
+                                     String domainURI) {
         this.registry = registry;
         this.configURI = new ConfigURI(domainRegistryURI);
     }
@@ -74,22 +79,27 @@
             map = new HashMap<Object, Object>();
         } else {
             initHazelcastInstance();
-            map = hazelcastInstance.getMap(configURI.getDomainName() + "Endpoints");
+            IMap imap = hazelcastInstance.getMap(configURI.getDomainName() + "/Endpoints");
+            imap.addEntryListener(this, true);
+            map = imap;
+            hazelcastInstance.getCluster().addMembershipListener(this);
         }
     }
 
     public void stop() {
         if (hazelcastInstance != null) {
             hazelcastInstance.shutdown();
+            hazelcastInstance = null;
+            map = null;
         }
     }
 
-    private void initHazelcastInstance()  {
+    private void initHazelcastInstance() {
         Config config = new XmlConfigBuilder().build();
 
         config.setPort(configURI.getListenPort());
         //config.setPortAutoIncrement(false);
-        
+
         if (configURI.getBindAddress() != null) {
             config.getNetworkConfig().getInterfaces().setEnabled(true);
             config.getNetworkConfig().getInterfaces().clear();
@@ -107,6 +117,8 @@
             config.getNetworkConfig().getJoin().getMulticastConfig().setMulticastGroup(configURI.getMulticastAddress());
         }
         
+        // config.getMapConfig(configURI.getDomainName() + "/Endpoints").setBackupCount(0);
+
         if (configURI.getRemotes().size() > 0) {
             TcpIpConfig tcpconfig = config.getNetworkConfig().getJoin().getJoinMembers();
             tcpconfig.setEnabled(true);
@@ -200,15 +212,15 @@
                         endpoint.setRemote(true);
                     }
                     // if (!entry.isPrimary()) {
-                    ((RuntimeEndpoint) endpoint).bind(registry, this);
+                    ((RuntimeEndpoint)endpoint).bind(registry, this);
                     // }
                     foundEndpoints.add(endpoint);
                     logger.fine("Found endpoint with matching service  - " + endpoint);
-                } 
+                }
                 // else the service name doesn't match
             }
         }
-        
+
         return foundEndpoints;
     }
 
@@ -252,46 +264,65 @@
     }
 
     public void updateEndpoint(String uri, Endpoint endpoint) {
-//      // TODO: is updateEndpoint needed?
-//      throw new UnsupportedOperationException();
+        //      // TODO: is updateEndpoint needed?
+        //      throw new UnsupportedOperationException();
+    }
+
+    public void entryAdded(EntryEvent<String, Endpoint> event) {
+        entryAdded(event.getKey(), event.getValue());
     }
 
-//    public void entryAdded(Object key, Object value) {
-//        MapEntry entry = (MapEntry)value;
-//        Endpoint newEp = (Endpoint)entry.getValue();
-//        if (!isLocal(entry)) {
-//            logger.info(id + " Remote endpoint added: " + entry.getValue());
-//            newEp.setRemote(true);
-//        }
-//        ((RuntimeEndpoint) newEp).bind(registry, this);
-//        for (EndpointListener listener : listeners) {
-//            listener.endpointAdded(newEp);
-//        }
-//    }
-//
-//    public void entryRemoved(Object key, Object value) {
-//        MapEntry entry = (MapEntry)value;
-//        if (!isLocal(entry)) {
-//            logger.info(id + " Remote endpoint removed: " + entry.getValue());
-//        }
-//        Endpoint oldEp = (Endpoint)entry.getValue();
-//        for (EndpointListener listener : listeners) {
-//            listener.endpointRemoved(oldEp);
-//        }
-//    }
-//
-//    public void entryUpdated(Object key, Object oldValue, Object newValue) {
-//        MapEntry oldEntry = (MapEntry)oldValue;
-//        MapEntry newEntry = (MapEntry)newValue;
-//        if (!isLocal(newEntry)) {
-//            logger.info(id + " Remote endpoint updated: " + newEntry.getValue());
-//        }
-//        Endpoint oldEp = (Endpoint)oldEntry.getValue();
-//        Endpoint newEp = (Endpoint)newEntry.getValue();
-//        ((RuntimeEndpoint) newEp).bind(registry, this);
-//        for (EndpointListener listener : listeners) {
-//            listener.endpointUpdated(oldEp, newEp);
-//        }
-//    }
+    public void entryEvicted(EntryEvent<String, Endpoint> event) {
+        // Should not happen
+    }
+
+    public void entryRemoved(EntryEvent<String, Endpoint> event) {
+        entryRemoved(event.getKey(), event.getValue());
+    }
+
+    public void entryUpdated(EntryEvent<String, Endpoint> event) {
+        entryUpdated(event.getKey(), null, event.getValue());
+    }
+
+    public void entryAdded(Object key, Object value) {
+        Endpoint newEp = (Endpoint)value;
+        if (!isLocal(newEp)) {
+            logger.info(" Remote endpoint added: " + newEp);
+            newEp.setRemote(true);
+        }
+        ((RuntimeEndpoint)newEp).bind(registry, this);
+        for (EndpointListener listener : listeners) {
+            listener.endpointAdded(newEp);
+        }
+    }
+
+    public void entryRemoved(Object key, Object value) {
+        Endpoint oldEp = (Endpoint)value;
+        if (!isLocal(oldEp)) {
+            logger.info(" Remote endpoint removed: " + value);
+        }
+        ((RuntimeEndpoint) oldEp).bind(registry, this);
+        for (EndpointListener listener : listeners) {
+            listener.endpointRemoved(oldEp);
+        }
+    }
+
+    public void entryUpdated(Object key, Object oldValue, Object newValue) {
+        Endpoint oldEp = (Endpoint)oldValue;
+        Endpoint newEp = (Endpoint)newValue;
+        if (!isLocal(newEp)) {
+            logger.info(" Remote endpoint updated: " + newEp);
+        }
+        ((RuntimeEndpoint)newEp).bind(registry, this);
+        for (EndpointListener listener : listeners) {
+            listener.endpointUpdated(oldEp, newEp);
+        }
+    }
+
+    public void memberAdded(MembershipEvent event) {
+    }
+
+    public void memberRemoved(MembershipEvent event) {
+    }
 
 }

Added: tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java?rev=900661&view=auto
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java (added)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java Tue Jan 19 05:37:38 2010
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+
+package org.apache.tuscany.sca.endpoint.hazelcast;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.tuscany.sca.assembly.AssemblyFactory;
+import org.apache.tuscany.sca.assembly.Binding;
+import org.apache.tuscany.sca.assembly.Component;
+import org.apache.tuscany.sca.assembly.Endpoint;
+import org.apache.tuscany.sca.assembly.SCABindingFactory;
+import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.hazelcast.core.IMap;
+
+// Ignore so its not run in the build yet till its working
+@Ignore("Hazelcast doesn't support the map entry management by members")
+public class MultiRegTestCase implements EndpointListener {
+    private static ExtensionPointRegistry extensionPoints;
+    private static AssemblyFactory assemblyFactory;
+    private static SCABindingFactory scaBindingFactory;
+
+    @BeforeClass
+    public static void init() {
+        extensionPoints = new DefaultExtensionPointRegistry();
+        FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
+        assemblyFactory = factories.getFactory(AssemblyFactory.class);
+        scaBindingFactory = factories.getFactory(SCABindingFactory.class);
+    }
+
+    @Test
+    public void testReplication() throws Exception {
+        RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+
+        String host = InetAddress.getLocalHost().getHostAddress();
+        String bind = null; // "9.65.158.31";
+        String port1 = "8085";
+        String port2 = "8086";
+        String port3 = "8087";
+        String range = "1";
+
+        Map<String, String> attrs1 = new HashMap<String, String>();
+        // attrs1.put("nomcast", "true");
+        attrs1.put("bind", bind);
+        attrs1.put("receiverPort", port1);
+        attrs1.put("receiverAutoBind", range);
+        // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
+        HazelcastEndpointRegistry reg1 = new HazelcastEndpointRegistry(extensionPoints, attrs1, "tuscany:foo", "bar");
+        reg1.addListener(this);
+        reg1.start();
+
+        Map<String, String> attrs2 = new HashMap<String, String>();
+        // attrs2.put("nomcast", "true");
+        attrs1.put("bind", bind);
+        attrs2.put("receiverPort", port2);
+        attrs2.put("receiverAutoBind", range);
+        // attrs2.put("routes", host + ":"+port1);
+        HazelcastEndpointRegistry reg2 = new HazelcastEndpointRegistry(extensionPoints, attrs2, "tuscany:foo", "bar");
+        reg2.addListener(this);
+        reg2.start();
+
+        Map<String, String> attrs3 = new HashMap<String, String>();
+        // attrs3.put("nomcast", "true");
+        attrs1.put("bind", bind);
+        attrs3.put("receiverPort", port3);
+        attrs3.put("receiverAutoBind", range);
+        // attrs3.put("routes", host + ":"+port1);
+        HazelcastEndpointRegistry reg3 = new HazelcastEndpointRegistry(extensionPoints, attrs3, "tuscany:foo", "bar");
+        reg3.addListener(this);
+        reg3.start();
+
+        ep1.bind(extensionPoints, reg1);
+        reg1.addEndpoint(ep1);
+        assertExists(reg1, "ep1uri");
+        assertExists(reg2, "ep1uri");
+        assertExists(reg3, "ep1uri");
+
+        RuntimeEndpoint ep2 = createEndpoint("ep2uri");
+        ep2.bind(extensionPoints, reg2);
+        reg2.addEndpoint(ep2);
+        assertExists(reg2, "ep2uri");
+        assertExists(reg1, "ep2uri");
+        assertExists(reg3, "ep2uri");
+        
+        System.out.println(((IMap)reg1.map).localKeySet().size());
+        System.out.println(((IMap)reg2.map).localKeySet().size());
+        System.out.println(((IMap)reg3.map).localKeySet().size());
+
+        reg1.stop();
+        Thread.sleep(6000);
+        Assert.assertNull(reg2.getEndpoint("ep1uri"));
+        Assert.assertNull(reg3.getEndpoint("ep1uri"));
+
+        System.out.println(((IMap)reg2.map).localKeySet().size());
+        System.out.println(((IMap)reg3.map).localKeySet().size());
+
+        assertExists(reg2, "ep2uri");
+        assertExists(reg3, "ep2uri");
+        
+        reg1.start();
+        ep1.bind(extensionPoints, reg1);
+        reg1.addEndpoint(ep1);
+        assertExists(reg1, "ep1uri");
+        assertExists(reg2, "ep1uri");
+        assertExists(reg3, "ep1uri");
+        
+        reg1.stop();
+        reg2.stop();
+        reg3.stop();
+        System.out.println(); // closed
+    }
+
+    private Endpoint assertExists(HazelcastEndpointRegistry reg, String uri) throws InterruptedException {
+        Endpoint ep = null;
+        int count = 0;
+        while (ep == null && count < 15) {
+            ep = reg.getEndpoint(uri);
+            if (ep == null) {
+                Thread.sleep(1000);
+                System.out.println(reg + ": tries=" + count);
+            }
+            count++;
+        }
+        Assert.assertNotNull(ep);
+        Assert.assertEquals(uri, ep.getURI());
+        return ep;
+    }
+
+    private RuntimeEndpoint createEndpoint(String uri) {
+        RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint();
+        Component comp = assemblyFactory.createComponent();
+        ep.setComponent(comp);
+        ep.setService(assemblyFactory.createComponentService());
+        Binding b = scaBindingFactory.createSCABinding();
+        ep.setBinding(b);
+        ep.setURI(uri);
+        return ep;
+    }
+    
+    private void print(String prefix, Endpoint ep) {
+        System.out.println(prefix + ": "+ep);
+    }
+
+    public void endpointAdded(Endpoint endpoint) {
+        print("Added", endpoint);
+    }
+
+    public void endpointRemoved(Endpoint endpoint) {
+        print("Removed", endpoint);
+    }
+
+    public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+        print("Updated", newEndpoint);
+    }
+
+}

Propchange: tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tuscany/sca-java-2.x/trunk/modules/endpoint-hazelcast/src/test/java/org/apache/tuscany/sca/endpoint/hazelcast/MultiRegTestCase.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/AbstractReplicatedMap.java Tue Jan 19 05:37:38 2010
@@ -708,7 +708,7 @@
                     // [rfeng] Change the behavior to replicate to all nodes
                     if (entry.isPrimary() && self.equals(entry.getPrimary())) {
                         try {
-                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+                            Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                             entry.setBackupNodes(backup);
                             entry.setPrimary(self);
                         } catch (ChannelException x) {
@@ -768,7 +768,7 @@
                 if (log.isDebugEnabled())
                     log.debug("[1] Primary choosing a new backup");
                 try {
-                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                     entry.setBackupNodes(backup);
                     entry.setPrimary(channel.getLocalMember(false));
                 } catch (ChannelException x) {
@@ -798,7 +798,7 @@
                     entry.setPrimary(channel.getLocalMember(false));
                     entry.setBackup(false);
                     entry.setProxy(false);
-                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue(), entry.getBackupNodes());
+                    Member[] backup = publishEntryInfo(entry.getKey(), entry.getValue());
                     entry.setBackupNodes(backup);
                     if (mapOwner != null)
                         mapOwner.objectMadePrimay(entry.getKey(), entry.getValue());
@@ -833,7 +833,7 @@
         return members[node];
     }
 
-    protected abstract Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException;
+    protected abstract Member[] publishEntryInfo(Object key, Object value) throws ChannelException;
 
     public void heartbeat() {
         try {
@@ -916,7 +916,7 @@
                 }
                 if (entry.isBackup()) {
                     //select a new backup node
-                    backup = publishEntryInfo(key, entry.getValue(), entry.getBackupNodes());
+                    backup = publishEntryInfo(key, entry.getValue());
                 } else if (entry.isProxy()) {
                     //invalidate the previous primary
                     msg =
@@ -997,7 +997,7 @@
             old = remove(key);
         try {
             if (notify) {
-                Member[] backup = publishEntryInfo(key, value, entry.getBackupNodes());
+                Member[] backup = publishEntryInfo(key, value);
                 entry.setBackupNodes(backup);
             }
         } catch (ChannelException x) {

Modified: tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedEndpointRegistry.java Tue Jan 19 05:37:38 2010
@@ -38,11 +38,13 @@
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelReceiver;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.GroupChannel;
 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.catalina.tribes.membership.McastService;
 import org.apache.catalina.tribes.membership.StaticMember;
+import org.apache.catalina.tribes.transport.ReceiverBase;
 import org.apache.tuscany.sca.assembly.Endpoint;
 import org.apache.tuscany.sca.assembly.EndpointReference;
 import org.apache.tuscany.sca.core.ExtensionPointRegistry;
@@ -67,6 +69,9 @@
     private String address = MULTICAST_ADDRESS;
     private String bind = null;
     private int timeout = 50;
+    private int receiverPort = 4000;
+    private int receiverAutoBind = 100;
+    private List<URI> staticRoutes;
 
     private final static String DEFAULT_DOMAIN_URI = "http://tuscany.apache.org/sca/1.1/domains/default";
     private String domainURI = DEFAULT_DOMAIN_URI;
@@ -75,7 +80,6 @@
 
     private ExtensionPointRegistry registry;
     private ReplicatedMap map;
-    private static List<URI> staticRoutes;
 
     private String id;
     private boolean noMultiCast;
@@ -175,6 +179,14 @@
         if (mcast != null) {
             noMultiCast = Boolean.valueOf(mcast);
         }
+        String recvPort = attributes.get("receiverPort");
+        if (recvPort != null) {
+            receiverPort = Integer.parseInt(recvPort);
+        }
+        String recvAutoBind = attributes.get("receiverAutoBind");
+        if (recvAutoBind != null) {
+            receiverAutoBind = Integer.parseInt(recvAutoBind);
+        }
     }
 
     public void start() {
@@ -190,9 +202,24 @@
         if (noMultiCast) {
             map.getChannel().addInterceptor(new DisableMcastInterceptor());
         }
-        
-        // http://www.mail-archive.com/users@tomcat.apache.org/msg24873.html
-        int port = channel.getChannelReceiver().getPort();
+
+        // Configure the receiver ports
+        ChannelReceiver receiver = channel.getChannelReceiver();
+        if (receiver instanceof ReceiverBase) {
+            ((ReceiverBase)receiver).setAutoBind(receiverAutoBind);
+            ((ReceiverBase)receiver).setPort(receiverPort);
+        }
+
+        /*
+        Object sender = channel.getChannelSender();
+        if (sender instanceof ReplicationTransmitter) {
+            sender = ((ReplicationTransmitter)sender).getTransport();
+        }
+        if (sender instanceof AbstractSender) {
+            ((AbstractSender)sender).setKeepAliveCount(0);
+            ((AbstractSender)sender).setMaxRetryAttempts(5);
+        }
+        */
         
         if (staticRoutes != null) {
             StaticMembershipInterceptor smi = new StaticMembershipInterceptor();
@@ -200,12 +227,12 @@
                 Member member;
                 try {
                     // The port has to match the receiver port
-                    member = new StaticMember(staticRoute.getHost(), port, 5000);
+                    member = new StaticMember(staticRoute.getHost(), staticRoute.getPort(), 5000);
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
                 smi.addStaticMember(member);
-                logger.info("Added static route: " + staticRoute.getHost() + ":" + port);
+                logger.info("Added static route: " + staticRoute.getHost() + ":" + staticRoute.getPort());
             }
             smi.setLocalMember(map.getChannel().getLocalMember(false));
             map.getChannel().addInterceptor(smi);
@@ -410,6 +437,7 @@
             logger.info(id + " Remote endpoint removed: " + entry.getValue());
         }
         Endpoint oldEp = (Endpoint)entry.getValue();
+        ((RuntimeEndpoint) oldEp).bind(registry, this);
         for (EndpointListener listener : listeners) {
             listener.endpointRemoved(oldEp);
         }

Modified: tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/main/java/org/apache/tuscany/sca/endpoint/tribes/ReplicatedMap.java Tue Jan 19 05:37:38 2010
@@ -17,6 +17,8 @@
 package org.apache.tuscany.sca.endpoint.tribes;
 
 import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
@@ -97,29 +99,65 @@
      * @return Member - the backup node
      * @throws ChannelException
      */
-    protected Member[] publishEntryInfo(Object key, Object value, Member[] backupNodes) throws ChannelException {
+    protected Member[] publishEntryInfo(Object key, Object value) throws ChannelException {
         if (!(key instanceof Serializable && value instanceof Serializable))
             return new Member[0];
         //select a backup node
-        Member[] backup = getMapMembers();
+        Member[] members = getMapMembers();
 
-        if (backup == null || backup.length == 0)
-            return null;
-
-        // Set the receivers to these members that are not in the backup nodes yet 
-        Member[] members = backup;
-        if (backupNodes != null) {
-            members = getMapMembersExcl(backupNodes);
+        if (members == null || members.length == 0) {
+            return new Member[0];
         }
 
         //publish the data out to all nodes
         MapMessage msg =
             new MapMessage(getMapContextName(), MapMessage.MSG_COPY, false, (Serializable)key, (Serializable)value,
-                           null, channel.getLocalMember(false), backup);
+                           null, channel.getLocalMember(false), members);
 
         getChannel().send(members, msg, getChannelSendOptions());
 
-        return backup;
+        return members;
+    }
+    
+    /**
+     * Override the base method to look up existing entries only
+     */
+    public Object get(Object key) {
+        MapEntry entry = super.getInternal(key);
+        if (log.isTraceEnabled())
+            log.trace("Requesting id:" + key + " entry:" + entry);
+        if (entry == null) {
+            return null;
+        }
+        return entry.getValue();
     }
 
+    /**
+     * Override the base method to remove all entries owned by the member that disappeared
+     */
+    public void memberDisappeared(Member member) {
+        boolean removed = false;
+        synchronized (mapMembers) {
+            removed = (mapMembers.remove(member) != null);
+            if (!removed) {
+                if (log.isDebugEnabled())
+                    log.debug("Member[" + member + "] disappeared, but was not present in the map.");
+                return; //the member was not part of our map.
+            }
+        }
+
+        Iterator<Map.Entry<Object, Object>> i = super.entrySetFull().iterator();
+        while (i.hasNext()) {
+            Map.Entry<Object, Object> e = i.next();
+            MapEntry entry = (MapEntry)super.getInternal(e.getKey());
+            if (entry == null) {
+                continue;
+            }
+            if (member.equals(entry.getPrimary())) {
+                if (log.isDebugEnabled())
+                    log.debug("[2] Primary disappeared");
+                i.remove();
+            } //end if
+        } //while
+    }    
 }

Modified: tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/endpoint-tribes/src/test/java/org/apache/tuscany/sca/endpoint/tribes/MultiRegTestCase.java Tue Jan 19 05:37:38 2010
@@ -19,6 +19,7 @@
 
 package org.apache.tuscany.sca.endpoint.tribes;
 
+import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -28,109 +29,141 @@
 import org.apache.tuscany.sca.assembly.Endpoint;
 import org.apache.tuscany.sca.assembly.SCABindingFactory;
 import org.apache.tuscany.sca.core.DefaultExtensionPointRegistry;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
 import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.runtime.EndpointListener;
+import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
 import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 // Ignore so its not run in the build yet till its working
-@Ignore
-public class MultiRegTestCase {
-
-//    @Test
-//    public void testTwoNodesMultiCast() throws InterruptedException {
-//        DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
-//        FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
-//        AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
-//
-//        ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
-//        reg1.start();
-//
-//        Endpoint ep1 = assemblyFactory.createEndpoint();
-//        Component comp = assemblyFactory.createComponent();
-//        ep1.setComponent(comp);
-//        ep1.setService(assemblyFactory.createComponentService());
-//        Binding b = new SCABindingFactoryImpl().createSCABinding();
-//        ep1.setBinding(b);
-//        ep1.setURI("ep1uri");
-//        reg1.addEndpoint(ep1);
-//
-//        Endpoint ep1p = reg1.getEndpoint("ep1uri");
-//        Assert.assertNotNull(ep1p);
-//        Assert.assertEquals("ep1uri", ep1p.getURI());
-//
-//        ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, null, "foo", "bar");
-//        reg2.start();
-//        Thread.sleep(5000);
-//
-//        Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
-//        Assert.assertNotNull(ep1p2);
-//        Assert.assertEquals("ep1uri", ep1p2.getURI());
-//
-//        reg1.stop();
-//        reg2.stop();
-//    }
+public class MultiRegTestCase implements EndpointListener {
+    private static ExtensionPointRegistry extensionPoints;
+    private static AssemblyFactory assemblyFactory;
+    private static SCABindingFactory scaBindingFactory;
+
+    @BeforeClass
+    public static void init() {
+        extensionPoints = new DefaultExtensionPointRegistry();
+        FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
+        assemblyFactory = factories.getFactory(AssemblyFactory.class);
+        scaBindingFactory = factories.getFactory(SCABindingFactory.class);
+    }
 
     @Test
-    public void testTwoNodesStaticNoMultiCast() throws InterruptedException {
-        DefaultExtensionPointRegistry extensionPoints = new DefaultExtensionPointRegistry();
-        FactoryExtensionPoint factories = extensionPoints.getExtensionPoint(FactoryExtensionPoint.class);
-        AssemblyFactory assemblyFactory = factories.getFactory(AssemblyFactory.class);
+    public void testReplication() throws Exception {
+        RuntimeEndpoint ep1 = createEndpoint("ep1uri");
+
+        String host = InetAddress.getLocalHost().getHostAddress();
+        String bind = null; // "9.65.158.31";
+        String port1 = "8085";
+        String port2 = "8086";
+        String port3 = "8087";
+        String range = "1";
 
         Map<String, String> attrs1 = new HashMap<String, String>();
-        attrs1.put("nomcast", "true");
-        attrs1.put("routes", "9.167.197.91:4001 9.167.197.91:4002");
+        // attrs1.put("nomcast", "true");
+        attrs1.put("bind", bind);
+        attrs1.put("receiverPort", port1);
+        attrs1.put("receiverAutoBind", range);
+        // attrs1.put("routes", host + ":" + port2 + " " + host + ":" + port3);
         ReplicatedEndpointRegistry reg1 = new ReplicatedEndpointRegistry(extensionPoints, attrs1, "foo", "bar");
+        reg1.addListener(this);
         reg1.start();
 
-        Endpoint ep1 = assemblyFactory.createEndpoint();
-        Component comp = assemblyFactory.createComponent();
-        ep1.setComponent(comp);
-        ep1.setService(assemblyFactory.createComponentService());
-        Binding b = factories.getFactory(SCABindingFactory.class).createSCABinding();
-        ep1.setBinding(b);
-        ep1.setURI("ep1uri");
-        reg1.addEndpoint(ep1);
-
-        Endpoint ep1p = reg1.getEndpoint("ep1uri");
-        Assert.assertNotNull(ep1p);
-        Assert.assertEquals("ep1uri", ep1p.getURI());
-
         Map<String, String> attrs2 = new HashMap<String, String>();
-        attrs2.put("nomcast", "true");
-        attrs2.put("routes", "9.167.197.91:4000");
+        // attrs2.put("nomcast", "true");
+        attrs1.put("bind", bind);
+        attrs2.put("receiverPort", port2);
+        attrs2.put("receiverAutoBind", range);
+        // attrs2.put("routes", host + ":"+port1);
         ReplicatedEndpointRegistry reg2 = new ReplicatedEndpointRegistry(extensionPoints, attrs2, "foo", "bar");
+        reg2.addListener(this);
         reg2.start();
-        
-        System.out.println("wait");
-        Thread.sleep(10000);
-        System.out.println("run");
-
-        Endpoint ep1p2 = reg2.getEndpoint("ep1uri");
-        Assert.assertNotNull(ep1p2);
-        Assert.assertEquals("ep1uri", ep1p2.getURI());
 
         Map<String, String> attrs3 = new HashMap<String, String>();
-        attrs3.put("nomcast", "true");
-        attrs3.put("routes", "9.167.197.91:4000");
+        // attrs3.put("nomcast", "true");
+        attrs1.put("bind", bind);
+        attrs3.put("receiverPort", port3);
+        attrs3.put("receiverAutoBind", range);
+        // attrs3.put("routes", host + ":"+port1);
         ReplicatedEndpointRegistry reg3 = new ReplicatedEndpointRegistry(extensionPoints, attrs3, "foo", "bar");
+        reg3.addListener(this);
         reg3.start();
-        
-        System.out.println("wait");
-        Thread.sleep(5000);
-        System.out.println("run");
-
-        Endpoint ep1p3 = reg3.getEndpoint("ep1uri");
-        Assert.assertNotNull(ep1p3);
-        Assert.assertEquals("ep1uri", ep1p3.getURI());
 
+        ep1.bind(extensionPoints, reg1);
+        reg1.addEndpoint(ep1);
+        assertExists(reg1, "ep1uri");
+        assertExists(reg2, "ep1uri");
+        assertExists(reg3, "ep1uri");
+
+        RuntimeEndpoint ep2 = createEndpoint("ep2uri");
+        ep2.bind(extensionPoints, reg2);
+        reg2.addEndpoint(ep2);
+        assertExists(reg2, "ep2uri");
+        assertExists(reg1, "ep2uri");
+        assertExists(reg3, "ep2uri");
+
+        reg1.stop();
+        Thread.sleep(6000);
+        Assert.assertNull(reg2.getEndpoint("ep1uri"));
+        Assert.assertNull(reg3.getEndpoint("ep1uri"));
+        assertExists(reg2, "ep2uri");
+        assertExists(reg3, "ep2uri");
+        
+        reg1.start();
+        ep1.bind(extensionPoints, reg1);
+        reg1.addEndpoint(ep1);
+        assertExists(reg1, "ep1uri");
+        assertExists(reg2, "ep1uri");
+        assertExists(reg3, "ep1uri");
         
-        System.out.println("wait2");
-        Thread.sleep(5000);
-        System.out.println("end");
         reg1.stop();
         reg2.stop();
         reg3.stop();
+        System.out.println(); // closed
+    }
+
+    private Endpoint assertExists(ReplicatedEndpointRegistry reg, String uri) throws InterruptedException {
+        Endpoint ep = null;
+        int count = 0;
+        while (ep == null && count < 15) {
+            ep = reg.getEndpoint(uri);
+            Thread.sleep(1000);
+            count++;
+            System.out.println(reg + ": tries=" + count);
+        }
+        Assert.assertNotNull(ep);
+        Assert.assertEquals(uri, ep.getURI());
+        return ep;
+    }
+
+    private RuntimeEndpoint createEndpoint(String uri) {
+        RuntimeEndpoint ep = (RuntimeEndpoint) assemblyFactory.createEndpoint();
+        Component comp = assemblyFactory.createComponent();
+        ep.setComponent(comp);
+        ep.setService(assemblyFactory.createComponentService());
+        Binding b = scaBindingFactory.createSCABinding();
+        ep.setBinding(b);
+        ep.setURI(uri);
+        return ep;
+    }
+    
+    private void print(String prefix, Endpoint ep) {
+        System.out.println(prefix + ": "+ep);
+    }
+
+    public void endpointAdded(Endpoint endpoint) {
+        print("Added", endpoint);
+    }
+
+    public void endpointRemoved(Endpoint endpoint) {
+        print("Removed", endpoint);
+    }
+
+    public void endpointUpdated(Endpoint oldEndpoint, Endpoint newEndpoint) {
+        print("Updated", newEndpoint);
     }
 
 }

Modified: tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/EndpointHelper.java Tue Jan 19 05:37:38 2010
@@ -69,6 +69,30 @@
         if (serviceID != null) {
             props.put(RemoteConstants.ENDPOINT_SERVICE_ID, Long.parseLong(serviceID));
         }
+
+        // FIXME: [rfeng] We need to calculate the intents supported by this endpoint
+        /*
+        QName bindingTypeName = endpoint.getBinding().getType();
+        Definitions definitions = null;
+        if(definitions!=null) {
+        for(BindingType bindingType: definitions.getBindingTypes()) {
+            if(bindingType.getType().equals(bindingTypeName)) {
+                bindingType.getAlwaysProvidedIntents();
+            }
+        }
+        */
+        
+        String intents = (String)props.get(RemoteConstants.SERVICE_EXPORTED_INTENTS);
+        String extraIntents = (String)props.get(RemoteConstants.SERVICE_EXPORTED_INTENTS_EXTRA);
+        if (intents == null) {
+            intents = "";
+        }
+        if (extraIntents != null) {
+            intents = intents + " " + extraIntents;
+        }
+
+        props.put(RemoteConstants.SERVICE_INTENTS, intents.trim());
+        
         props.put(RemoteConstants.ENDPOINT_ID, endpoint.getURI());
         // FIXME: [rfeng] How to pass in the remote service id from the endpoint XML
         props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, new String[] {"org.osgi.sca"});

Modified: tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/remoteserviceadmin/impl/RemoteServiceAdminImpl.java Tue Jan 19 05:37:38 2010
@@ -27,6 +27,12 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.UtilityExtensionPoint;
+import org.apache.tuscany.sca.definitions.Definitions;
+import org.apache.tuscany.sca.deployment.Deployer;
+import org.apache.tuscany.sca.policy.BindingType;
+import org.apache.tuscany.sca.policy.Intent;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
@@ -73,10 +79,28 @@
         importer.start();
         Hashtable<String, Object> props = new Hashtable<String, Object>();
         props.put(RemoteConstants.REMOTE_CONFIGS_SUPPORTED, new String[] {"org.osgi.sca"});
+        
+        ExtensionPointRegistry registry = exporter.getExtensionPointRegistry();
+        UtilityExtensionPoint utilities = registry.getExtensionPoint(UtilityExtensionPoint.class);
+        Deployer deployer = utilities.getUtility(Deployer.class);
+        Definitions definitions = deployer.getSystemDefinitions();
+
+        String[] intents = new String[definitions.getIntents().size()];
+        int i = 0;
+        for (Intent intent : definitions.getIntents()) {
+            intents[i++] = intent.toString();
+        }
+
+        String[] bindingTypes = new String[definitions.getBindingTypes().size()];
+        i = 0;
+        for (BindingType bindingType : definitions.getBindingTypes()) {
+            bindingTypes[i++] = bindingType.getType().toString();
+        }
+        
         // FIXME: We should ask SCA domain for the supported intents
-        props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, new String[] {});
+        props.put(RemoteConstants.REMOTE_INTENTS_SUPPORTED, intents);
         // FIXME: We should ask SCA domain for the supported binding types
-        props.put("org.osgi.sca.binding.types", new String[] {});
+        props.put("org.osgi.sca.binding.types", bindingTypes);
         registration = context.registerService(RemoteServiceAdmin.class.getName(), this, props);
         
         props = new Hashtable<String, Object>();

Modified: tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java?rev=900661&r1=900660&r2=900661&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/node-impl-osgi/src/main/java/org/apache/tuscany/sca/osgi/service/discovery/impl/DomainDiscoveryService.java Tue Jan 19 05:37:38 2010
@@ -53,6 +53,19 @@
         this.domainRegistryFactory =
             registry.getExtensionPoint(UtilityExtensionPoint.class).getUtility(DomainRegistryFactory.class);
         domainRegistryFactory.addListener(this);
+
+        // [rfeng] Starting of the endpoint registry takes a long time and it leaves the bundle
+        // state to be starting. When the registry is started, remote endpoints come in and that
+        // triggers the classloading from this bundle.
+        Thread thread = new Thread() {
+            public void run() {
+                startEndpointRegistry();
+            }
+        };
+        thread.start();
+    }
+
+    private void startEndpointRegistry() {
         // The following code forced the start() of the domain registry in absense of services
         String domainRegistry = context.getProperty("org.osgi.sca.domain.registry");
         if (domainRegistry == null) {
@@ -103,6 +116,8 @@
         {
             // Notify the endpoint listeners
             EndpointDescription description = createEndpointDescription(bundleContext, endpoint);
+            // Set the owning bundle to runtime bundle to avoid NPE
+            servicesInfo.put(description, context.getBundle());
             endpointChanged(description, ADDED);
         }
     }
@@ -115,6 +130,7 @@
         */ 
         {
             EndpointDescription description = createEndpointDescription(context, endpoint);
+            servicesInfo.remove(description);
             endpointChanged(description, REMOVED);
         }
     }