You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by jb...@apache.org on 2009/08/01 22:47:18 UTC

svn commit: r799923 - in /servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi: RmiBridgeEndpoint.java RmiComponent.java RmiConsumerEndpoint.java RmiProviderEndpoint.java

Author: jbonofre
Date: Sat Aug  1 20:47:18 2009
New Revision: 799923

URL: http://svn.apache.org/viewvc?rev=799923&view=rev
Log:
Add consumer, provider and bridge RMI endpoints.

Added:
    servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiBridgeEndpoint.java   (with props)
    servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiConsumerEndpoint.java   (with props)
    servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiProviderEndpoint.java   (with props)
Modified:
    servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiComponent.java

Added: servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiBridgeEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiBridgeEndpoint.java?rev=799923&view=auto
==============================================================================
--- servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiBridgeEndpoint.java (added)
+++ servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiBridgeEndpoint.java Sat Aug  1 20:47:18 2009
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.rmi;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.rmi.Remote;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.List;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
+
+/**
+ * <p>
+ * A special {@link org.apache.servicemix.common.endpoints.ConsumerEndpoint ConsumerEndpoint} that provide a RMI bridge.
+ * It waits for RMI method invocation (consumer) and "forward" the method invocation to remote RMI endpoint (provider).
+ * WARNING: the method invocation never uses the NMR, no normalized messages are created, the forward is direct from RMI to RMI. 
+ * </p>
+ * 
+ * @author jbonofre
+ */
+public class RmiBridgeEndpoint extends ConsumerEndpoint implements RmiEndpointType, InvocationHandler {
+
+    private List<Class> remoteInterfaces; // the remote interfaces to use (consumer/provider)
+    private int localPort = 1099; // the local registry port number
+    private String name; // the endpoint name is the registries (consumer/provider)
+    private String bridgeHost; // the target RMI registry host name (provider)
+    private int bridgePort = 1099; // the target RMI registry port number (provider)
+    
+    private Registry localRegistry = null; // the local RMI registry (consumer)
+    private Remote localStub = null; // the local stub (consumer)
+    
+    private Registry bridgeRegistry = null; // the target RMI registry (provider)
+    private Remote bridgeStub = null; // the target stub (provider)
+    
+    private Remote proxy = null; // the remote proxy
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
+     */
+    @Override
+    public void validate() throws DeploymentException {
+        super.validate();
+        
+        // validate properties
+        if (bridgeHost == null || bridgeHost.trim().length() < 1) {
+            throw new DeploymentException("The bridge RMI host is mandatory.");
+        }
+        
+        if (name == null || name.trim().length() < 1) {
+            // if the user doesnt' define the RMI name, use the endpoint name
+            name = this.getEndpoint();
+        }
+        
+        // get the target remote RMI registry if required
+        if (bridgeRegistry == null) {
+            try {
+                bridgeRegistry = LocateRegistry.getRegistry(bridgeHost, bridgePort);
+            } catch (Exception e) {
+                throw new DeploymentException("Can't connect using RMI on " + bridgeHost + ":" + bridgePort, e);
+            }
+        }
+        
+        try {
+            // lookup the bridge stub
+            bridgeStub = bridgeRegistry.lookup(name);
+        } catch (Exception e) {
+            throw new DeploymentException("Remote object " + name + " lookup fails.", e);
+        }
+        
+        if (bridgeStub == null) {
+            throw new DeploymentException("The bridge remote object is not found.");
+        }
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#start()
+     */
+    @Override
+    public void start() throws Exception {
+        super.start();
+        
+        // create the dynamic proxy (consumer)
+        proxy = (Remote) Proxy.newProxyInstance(this.getClass().getClassLoader(), (Class[]) remoteInterfaces.toArray(), this);
+        // create the local stub (consumer)
+        localStub = UnicastRemoteObject.exportObject(proxy, localPort);
+        
+        try {
+            // create the local RMI registry if required
+            if (localRegistry == null) {
+                localRegistry = LocateRegistry.createRegistry(localPort);
+            }
+            // register the local stub into the RMI local registry
+            localRegistry.bind(name, localStub);
+        } catch (Exception e) {
+            // an error occurs, unbind the local stub
+            try {
+                localRegistry.unbind(name);
+            } catch (Throwable ignore) { }
+            localStub = null;
+            throw e;
+        }
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
+     */
+    @Override
+    public void stop() throws Exception {
+        // unbind the local stub
+        if (localRegistry != null) {
+            try {
+                localRegistry.unbind(name);
+            } catch (Throwable ignore) { }
+        }
+        // unexport proxy
+        UnicastRemoteObject.unexportObject(proxy, true);
+        
+        super.stop();
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
+     */
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        // the bridge is real made here
+        if (method.getDeclaringClass().isInstance(bridgeStub)) {
+            // directly implemented
+            return method.invoke(bridgeStub, args);
+        } else {
+            // not directly implemented
+            Method stubMethod = bridgeStub.getClass().getMethod(method.getName(), method.getParameterTypes());
+            return stubMethod.invoke(bridgeStub, args);
+        }
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.AbstractEndpoint#process(javax.jbi.messaging.MessageExchange)
+     */
+    @Override
+    public void process(MessageExchange exchange) throws Exception {
+        if (exchange.getStatus() == ExchangeStatus.DONE) {
+            // received DONE for a sent message
+            return;
+        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            // received ERROR state for a sent message
+            // there is no real error handling here for now
+            return;
+        } else {
+            throw new MessagingException("Unsupported exchange received...");
+        }
+    }
+     
+}

Propchange: servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiBridgeEndpoint.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiComponent.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiComponent.java?rev=799923&r1=799922&r2=799923&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiComponent.java (original)
+++ servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiComponent.java Sat Aug  1 20:47:18 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.servicemix.rmi;
 
+import java.util.List;
+
 import org.apache.servicemix.common.DefaultComponent;
 
 /**
@@ -26,6 +28,44 @@
  */
 public class RmiComponent extends DefaultComponent {
 
-    private RmiEndpointType[] endpoints;
+    private RmiEndpointType[] endpoints; // list of RMI endpoints
+    
+    /**
+     * <p>
+     * Getter on the component endpoints.
+     * </p>
+     * 
+     * @return the RMI endpoints list.
+     */
+    public RmiEndpointType[] getEndpoints() {
+        return this.endpoints;
+    }
+    
+    /**
+     * <p>
+     * Setter on the component endpoints.
+     * </p>
+     * 
+     * @param endpoints the RMI endpoints list.
+     */
+    public void setEndpoints(RmiEndpointType[] endpoints) {
+        this.endpoints = endpoints;
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.DefaultComponent#getConfiguredEndpoints()
+     */
+    protected List getConfiguredEndpoints() {
+        return asList(this.endpoints);
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.DefaultComponent#getEndpointClasses()
+     */
+    protected Class[] getEndpointClasses() {
+        return new Class[] { RmiProviderEndpoint.class, RmiConsumerEndpoint.class, RmiBridgeEndpoint.class };
+    }
     
 }

Added: servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiConsumerEndpoint.java?rev=799923&view=auto
==============================================================================
--- servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiConsumerEndpoint.java (added)
+++ servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiConsumerEndpoint.java Sat Aug  1 20:47:18 2009
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.rmi;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.rmi.Remote;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.List;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+import org.apache.servicemix.common.endpoints.ConsumerEndpoint;
+
+/**
+ * <p>
+ * A {@link org.apache.servicemix.common.endpoints.ConsumerEndpoint ConsumerEndpoint} which uses RMI's {@link UnicastRemoteObject}
+ * to consume method invocation.
+ * </p>
+ * 
+ * @author jbonofre
+ */
+public class RmiConsumerEndpoint extends ConsumerEndpoint implements RmiEndpointType, InvocationHandler {
+
+    private List<Class> remoteInterfaces; // the remote interfaces
+    private int port = 1099; // the RMI registry port number
+    private String name; // the endpoint name into the RMI registry
+    
+    private Registry registry = null; // the RMI registry
+    private Remote stub; // the remote stub
+    private Remote proxy; // the remote proxy
+    
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.ConsumerEndpoint#validate()
+     */
+    @Override
+    public void validate() throws DeploymentException {
+        super.validate();
+        
+        if (name == null || name.trim().length() < 1) {
+            // if the user hasn't define the registry name, take the endpoint one
+            name = this.getEndpoint();
+        }
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#start()
+     */
+    @Override
+    public synchronized void start() throws Exception {
+        super.start();
+        // create the dynamic proxy
+        proxy = (Remote) Proxy.newProxyInstance(this.getClass().getClassLoader(), (Class[])remoteInterfaces.toArray(), this);
+        // create the remote stub
+        stub = UnicastRemoteObject.exportObject(proxy, port);
+        try {
+            // create a RMI registry if required
+            if (registry == null) {
+                registry = LocateRegistry.createRegistry(port);
+            }
+            // register the object into the registry
+            registry.bind(name, stub);
+        } catch (Exception e) {
+            // an error occurs, unexport the name from the registry
+            try {
+                registry.unbind(name);
+            } catch (Throwable ignore) { }
+            stub = null;
+            throw e;
+        }
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.SimpleEndpoint#stop()
+     */
+    @Override
+    public synchronized void stop() throws Exception {
+        // unbind if required
+        if (registry != null) {
+            try {
+                registry.unbind(name);
+            } catch (Throwable ignore) { }
+        }
+        // unexport the proxy
+        UnicastRemoteObject.unexportObject(proxy, true);
+        super.stop(); 
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[])
+     */
+    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+        // TODO transform RMI request into NMR message using a marshaler
+        logger.debug("Proxy: " + proxy);
+        logger.debug("Method: " + method);
+        logger.debug("Args: " + args);
+        return null;
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.AbstractEndpoint#process(javax.jbi.messaging.MessageExchange)
+     */
+    @Override
+    public void process(MessageExchange exchange) throws Exception {
+        if (exchange.getStatus() == ExchangeStatus.DONE) {
+            // received DONE for a sent message
+            return;
+        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            // received ERROR state for a sent message
+            // there is no real error handling here for now
+            return;
+        } else {
+            throw new MessagingException("Unsupported exchange received...");
+        }
+    }
+    
+}

Propchange: servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiConsumerEndpoint.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiProviderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiProviderEndpoint.java?rev=799923&view=auto
==============================================================================
--- servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiProviderEndpoint.java (added)
+++ servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiProviderEndpoint.java Sat Aug  1 20:47:18 2009
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.rmi;
+
+import java.rmi.Remote;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.transform.TransformerException;
+
+import org.apache.servicemix.common.endpoints.ProviderEndpoint;
+import org.apache.servicemix.jbi.helper.MessageUtil;
+
+/**
+ * <p>
+ * A {@link org.apache.servicemix.common.endpoints.ProviderEndpoint ProviderEndpoint} that call an external RMI remote object.
+ * </p>
+ * 
+ * @author jbonofre
+ */
+public class RmiProviderEndpoint extends ProviderEndpoint implements RmiEndpointType {
+
+    private String host; // target RMI host name
+    private int port = 1099; // target RMI port number
+    private String name; // target name into the RMI registry
+    
+    private Registry registry = null; // the RMI registry
+    private Remote stub = null; // the remote stub
+    
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.AbstractEndpoint#validate()
+     */
+    @Override
+    public void validate() throws DeploymentException {
+        super.validate();
+        
+        if (host == null || host.trim().length() < 1) {
+            throw new DeploymentException("The RMI host is mandatory.");
+        }
+        
+        if (name == null || name.trim().length() < 1) {
+            // the user hasn't provide the RMI name, use the endpoint one
+            name = this.getEndpoint();
+        }
+        
+        // get the RMI registry, if required
+        if (registry == null) {
+            try {
+                registry = LocateRegistry.getRegistry(host, port);
+            } catch (Exception e) {
+                throw new DeploymentException("Can't connect using RMI on " + host + ":" + port, e);
+            }
+        }
+        
+        // lookup the stub
+        try {
+            stub = registry.lookup(name);
+        } catch (Exception e) {
+            throw new DeploymentException("Remote object " + name + " lookup fails.", e);
+        }
+        
+        if (stub == null) {
+            throw new DeploymentException("Remote object " + name + " is not found.");
+        }
+    }
+    
+    /*
+     * (non-Javadoc)
+     * @see
+     * org.apache.servicemix.common.endpoints.ProviderEndpoint#processInOnly
+     * (javax.jbi.messaging.MessageExchange,
+     * javax.jbi.messaging.NormalizedMessage)
+     */
+    @Override
+    protected void processInOnly(MessageExchange exchange, NormalizedMessage inMsg) throws Exception {
+        process(exchange, inMsg);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see
+     * org.apache.servicemix.common.endpoints.ProviderEndpoint#processInOut(
+     * javax.jbi.messaging.MessageExchange,
+     * javax.jbi.messaging.NormalizedMessage,
+     * javax.jbi.messaging.NormalizedMessage)
+     */
+    @Override
+    protected void processInOut(MessageExchange exchange, NormalizedMessage in, NormalizedMessage out)
+        throws Exception {
+        // we are reading the source of the NormalizedMessage multiple times
+        // (else we receive a IOException: Stream closed)
+        MessageUtil.enableContentRereadability(in);
+
+        process(exchange, in);
+
+        // message was delivered, simply copy the in message with properties and
+        // attachements to out
+        MessageUtil.transfer(in, out);
+    }
+
+    /**
+     * <p>
+     * Process the incoming exchange.
+     * </p>
+     * 
+     * @param exchange the message exchange.
+     * @param in the in message.
+     * @throws TransformerException on transformation errors.
+     * @throws MessagingException on messaging errors,
+     */
+    private void process(MessageExchange exchange, NormalizedMessage in) throws TransformerException, MessagingException {
+        // TODO parse the in message (using a marshaler and transform it in RMI call
+    }
+    
+    
+}

Propchange: servicemix/components/bindings/servicemix-rmi/trunk/src/main/java/org/apache/servicemix/rmi/RmiProviderEndpoint.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain