You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yoko-commits@incubator.apache.org by en...@apache.org on 2006/11/28 14:11:25 UTC

svn commit: r480052 - in /incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2: CorbaConduit.java CorbaServerConduit.java

Author: enolan
Date: Tue Nov 28 06:11:25 2006
New Revision: 480052

URL: http://svn.apache.org/viewvc?view=rev&rev=480052
Log:
Yoko 225 - Porting the yoko core binding to cxf - merging to branch.

Added:
    incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaServerConduit.java   (with props)
Modified:
    incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaConduit.java

Modified: incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaConduit.java
URL: http://svn.apache.org/viewvc/incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaConduit.java?view=diff&rev=480052&r1=480051&r2=480052
==============================================================================
--- incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaConduit.java (original)
+++ incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaConduit.java Tue Nov 28 06:11:25 2006
@@ -19,22 +19,46 @@
 
 package org.apache.yoko.bindings.corba2;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.BindingOperationInfo;
 import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.ServiceInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.schemas.yoko.bindings.corba.AddressType;
+import org.apache.schemas.yoko.bindings.corba.OperationType;
+import org.apache.schemas.yoko.bindings.corba.RaisesType;
+import org.apache.schemas.yoko.bindings.corba.TypeMappingType;
 import org.apache.yoko.wsdl.CorbaConstants;
 
+import org.omg.CORBA.Any;
+import org.omg.CORBA.Context;
+import org.omg.CORBA.ContextList;
+import org.omg.CORBA.ExceptionList;
+import org.omg.CORBA.NVList;
+import org.omg.CORBA.NamedValue;
 import org.omg.CORBA.ORB;
+import org.omg.CORBA.Request;
+import org.omg.CORBA.TypeCode;
 
 public class CorbaConduit implements Conduit {
     private static final Logger LOG = LogUtils.getL7dLogger(CorbaConduit.class);
@@ -74,14 +98,22 @@
             }            
             org.omg.CORBA.Object targetObject = CorbaUtils.importObjectReference(orb, address.getLocation());
             message.put(CorbaConstants.ORB, orb);
-            message.put(CorbaConstants.CORBA_ENDPOINT_OBJECT, targetObject);            
+            message.put(CorbaConstants.CORBA_ENDPOINT_OBJECT, targetObject);
+            message.setContent(OutputStream.class,
+                               new CorbaOutputStream(message, targetObject));
         } catch (java.lang.Exception ex) {
             LOG.log(Level.SEVERE, "Could not resolve target object");
             throw new CorbaBindingException(ex);
         }
     }
 
-    public void close(Message message) throws IOException {     
+    public void close(Message message) throws IOException {
+        System.out.println("in close message in corbaConduit");            
+        BindingOperationInfo boi = message.getExchange().get(BindingOperationInfo.class);
+        OperationType opType = boi.getExtensor(OperationType.class);
+        
+        buildRequest((CorbaMessage)message, opType);
+        message.getContent(OutputStream.class).close();
     }
 
     public EndpointReferenceType getTarget() {
@@ -89,10 +121,13 @@
     }
 
     public Destination getBackChannel() {
+        System.out.println("In CorbaConduit back channel.");
         return null;
     }
 
     public void close() {
+        System.out.println("in Close() in corbaConduit");
+        
     }
 
     public void setMessageObserver(MessageObserver observer) {
@@ -112,7 +147,153 @@
         return ref;
     }
 
-    private String getAddress() {
+    protected String getAddress() {
         return endpointInfo.getAddress();
+    }
+    
+        
+    
+    protected void buildRequest(CorbaMessage message, OperationType opType) {
+        try {
+            List<CorbaTypeMap> typeMaps = new ArrayList<CorbaTypeMap>();
+
+            ServiceInfo service = message.getExchange().get(ServiceInfo.class);
+            List<TypeMappingType> corbaTypes = service.getExtensors(TypeMappingType.class);
+            if (corbaTypes != null) {
+                CorbaUtils.createCorbaTypeMap(typeMaps, corbaTypes);
+            }
+
+            // Build the list of DII arguments, contexts, returns, and
+            // exceptions
+            CorbaStreamable[] arguments = message.getStreamableArguments();
+            NVList list = orb.create_list(arguments.length);
+
+            for (int i = 0; i < arguments.length; ++i) {
+                Any value = orb.create_any();
+                value.insert_Streamable(arguments[i]);
+                list.add_value(arguments[i].getName(), value, arguments[i].getMode());
+            }
+
+            ContextList ctxList = orb.create_context_list();
+            Context ctx = orb.get_default_context();
+
+            CorbaStreamable retVal = message.getStreamableReturn();
+            NamedValue ret = null;
+            if (retVal != null) {
+                Any returnAny = orb.create_any();
+                returnAny.insert_Streamable(retVal);
+                ret = orb.create_named_value(retVal.getName(), returnAny, org.omg.CORBA.ARG_OUT.value);
+            } else {
+                // TODO: REVISIT: for some reason, the yoko ORB does not like to
+                // have a null NamedValue
+                // return value. Create this 'empty' one if a void return type
+                // is used.
+                ret = orb.create_named_value("return", orb.create_any(), org.omg.CORBA.ARG_OUT.value);
+            }
+
+            // Get the typecodes for the exceptions this operation can throw.
+            // These are defined in the
+            // operation definition from WSDL.
+            ExceptionList exList = orb.create_exception_list();
+
+            Map<TypeCode, RaisesType> exceptions = getOperationExceptions(opType, typeMaps);
+            Object[] tcs = null;
+            if (exceptions != null) {
+                tcs = exceptions.keySet().toArray();
+            
+                for (int i = 0; i < exceptions.size(); ++i) {
+                    exList.add((TypeCode)tcs[i]);
+                }
+            }
+            org.omg.CORBA.Object targetObj = (org.omg.CORBA.Object)message
+                .get(CorbaConstants.CORBA_ENDPOINT_OBJECT);
+            Request request = targetObj._create_request(ctx, opType.getName(), list, ret, exList, ctxList);
+            request.invoke();
+
+        } catch (java.lang.Exception ex) {
+            ex.printStackTrace();
+        }
+    }
+    
+    protected Map<TypeCode, RaisesType> getOperationExceptions(OperationType operation, List<CorbaTypeMap> typeMaps) {
+        Map<TypeCode, RaisesType> exceptions = new HashMap<TypeCode, RaisesType>();
+        List<RaisesType> exList = operation.getRaises();     
+        int raiseSize = exList.size();
+
+        for (int i = 0; i < raiseSize; ++i) {
+            RaisesType ex = exList.get(i);
+            TypeCode tc = CorbaUtils.getTypeCode(orb, ex.getException(), typeMaps);
+            exceptions.put(tc, ex);
+        }
+
+        return exceptions;
+    }
+    
+    private class CorbaOutputStream extends AbstractCachedOutputStream {
+       
+        private Message message;
+        private boolean isOneWay;
+        org.omg.CORBA.Object targetObject;
+
+        CorbaOutputStream(Message m) {
+            message = m;        
+        }
+
+        CorbaOutputStream(Message m, org.omg.CORBA.Object Object) {
+            message = m;
+            org.omg.CORBA.Object targetObject = Object;
+        }
+
+        /**
+         * Perform any actions required on stream flush (freeze headers, reset
+         * output stream ... etc.)
+         */
+        public void doFlush() throws IOException {
+            // do nothing here
+        }
+
+        /**
+         * Perform any actions required on stream closure (handle response etc.)
+         */
+        public void doClose() throws IOException {
+            if (ContextUtils.isRequestor(message) && ContextUtils.isOutbound(message)) {
+                System.out.println("is requester and outbound message");
+                try {
+                    isOneWay = message.getExchange().isOneWay();
+                    
+                    if (!isOneWay) {                
+                        handleResponse();
+                        System.out.println("handled response");
+                    }
+                } catch (Exception ex) {
+                    LOG.log(Level.WARNING, "Connection failed with Exception : ", ex);
+                    throw new IOException(ex.toString());
+                }            
+            } else  if (!ContextUtils.isRequestor(message) && !ContextUtils.isOutbound(message)) {
+                System.out.println("is not requester and is inbound message");
+            } else if (!ContextUtils.isRequestor(message) && ContextUtils.isOutbound(message)) {
+                System.out.println("is not requester and is outbound message");
+            } if (ContextUtils.isRequestor(message) && !ContextUtils.isOutbound(message)) {
+                System.out.println("is requester and is inbound message");
+            }
+        }
+
+        public void onWrite() throws IOException {
+
+        }
+
+        public void handleResponse() throws IOException {
+            LOG.log(Level.FINE, "incoming observer is " + incomingObserver);
+            Exchange exchange = message.getExchange();            
+            MessageImpl inMessage = new MessageImpl();
+            CorbaDestination destination = new CorbaDestination(endpointInfo);
+            inMessage.setDestination(destination);
+            exchange.put(ORB.class, orb);
+            inMessage.setExchange(exchange);
+                        
+            LOG.log(Level.FINE, "incoming observer is " + incomingObserver);
+            incomingObserver.onMessage((Message)inMessage);          
+        }
+
     }
 }

Added: incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaServerConduit.java
URL: http://svn.apache.org/viewvc/incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaServerConduit.java?view=auto&rev=480052
==============================================================================
--- incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaServerConduit.java (added)
+++ incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaServerConduit.java Tue Nov 28 06:11:25 2006
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.yoko.bindings.corba2;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.io.AbstractCachedOutputStream;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.EndpointInfo;
+
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.AttributedURIType;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.schemas.yoko.bindings.corba.AddressType;
+import org.apache.yoko.wsdl.CorbaConstants;
+
+import org.omg.CORBA.Any;
+import org.omg.CORBA.NVList;
+import org.omg.CORBA.ORB;
+import org.omg.CORBA.ServerRequest;
+
+
+public class CorbaServerConduit implements Conduit {
+    private static final Logger LOG = LogUtils.getL7dLogger(CorbaConduit.class);
+
+    private EndpointInfo endpointInfo;
+    private EndpointReferenceType target;
+    private MessageObserver incomingObserver;
+    private ORB orb;
+
+    public CorbaServerConduit(EndpointInfo ei, EndpointReferenceType ref) {
+        endpointInfo = ei;
+        target = getTargetReference(ref);
+        // TODO: Set any additional properties needed to initialize the ORB  before
+        // we initialize it. We will get this information from the Celtix configuration 
+        // that is used with the current application.
+        java.util.Properties props = System.getProperties();
+        props.put("org.omg.CORBA.ORBClass", "org.apache.yoko.orb.CORBA.ORB");
+        props.put("org.omg.CORBA.ORBSingletonClass",
+                "org.apache.yoko.orb.CORBA.ORBSingleton");
+        props.put("yoko.orb.id", "Yoko-Client-Binding");
+
+        orb = ORB.init(new String[0], props);
+        if (orb == null) {
+            LOG.severe("Could not create instance of the ORB");
+            throw new CorbaBindingException("Could not create instance of the ORB");
+        }
+    }
+
+    public void send(Message message) throws IOException {
+        
+        System.out.println("send message...");
+        try {
+            AddressType address = endpointInfo.getExtensor(AddressType.class);
+
+            if (address == null) {
+                LOG.log(Level.SEVERE, "Unable to locate a valid CORBA address");
+                throw new CorbaBindingException("Unable to locate a valid CORBA address");
+            }            
+            org.omg.CORBA.Object targetObject = CorbaUtils.importObjectReference(orb, address.getLocation());
+            message.put(CorbaConstants.ORB, orb);
+            message.put(CorbaConstants.CORBA_ENDPOINT_OBJECT, targetObject);
+            message.setContent(OutputStream.class,
+                               new CorbaOutputStream(message));
+        } catch (java.lang.Exception ex) {
+            LOG.log(Level.SEVERE, "Could not resolve target object");
+            throw new CorbaBindingException(ex);
+        }
+    }
+
+    public void close(Message message) throws IOException {        
+        buildRequestResult((CorbaMessage)message);
+        message.getContent(OutputStream.class).close();
+    }
+
+    public EndpointReferenceType getTarget() {
+        return target;
+    }
+
+    public Destination getBackChannel() {
+        return null;
+    }
+
+    public void close() {
+    }
+
+    public void setMessageObserver(MessageObserver observer) {
+        incomingObserver = observer;
+    }
+
+    public EndpointReferenceType getTargetReference(EndpointReferenceType t) {
+        EndpointReferenceType ref = null;
+        if (null == t) {
+            ref = new EndpointReferenceType();
+            AttributedURIType address = new AttributedURIType();
+            address.setValue(getAddress());
+            ref.setAddress(address);
+        } else {
+            ref = t;
+        }
+        return ref;
+    }
+
+    protected String getAddress() {
+        return endpointInfo.getAddress();
+    }
+    
+    
+    protected void buildRequestResult(CorbaMessage msg) {
+        Exchange exg = msg.getExchange();        
+        ServerRequest request = exg.get(ServerRequest.class);
+        try {
+            if (!exg.isOneWay()) {                
+                CorbaMessage inMsg = (CorbaMessage)msg.getExchange().getInMessage();
+                NVList list = inMsg.getList();
+
+                if (msg.getStreamableException() != null) {                    
+                    Any exAny = orb.create_any();
+                    exAny.insert_Streamable(msg.getStreamableException());
+                    request.set_exception(exAny);
+                } else {
+                    CorbaStreamable[] arguments = msg.getStreamableArguments();
+                    for (int i = 0; i < arguments.length; ++i) {
+                        if (list.item(i).flags() != org.omg.CORBA.ARG_IN.value) {
+                            list.item(i).value().insert_Streamable(arguments[i]);
+                        }
+                    }
+
+                    CorbaStreamable resultValue = msg.getStreamableReturn();
+                    if (resultValue != null) {
+                        Any resultAny = orb.create_any();
+                        resultAny.insert_Streamable(resultValue);
+                        request.set_result(resultAny);
+                    }
+                }
+            }
+
+        } catch (java.lang.Exception ex) {
+            throw new CorbaBindingException("Exception during buildRequestResult", ex);
+        }
+    }        
+    
+    private class CorbaOutputStream extends AbstractCachedOutputStream {
+        
+        private Message outMessage;                
+
+        CorbaOutputStream(Message m) {
+            outMessage = m;        
+        }
+
+        CorbaOutputStream(Message m, org.omg.CORBA.Object Object) {
+            outMessage = m;            
+        }
+
+        /**
+         * Perform any actions required on stream flush (freeze headers, reset
+         * output stream ... etc.)
+         */
+        public void doFlush() throws IOException {
+
+            // do nothing here
+        }
+
+        /**
+         * Perform any actions required on stream closure (handle response etc.)
+         */
+        public void doClose() throws IOException {
+            System.out.println("In do close of outputstream");
+            if (ContextUtils.isRequestor(outMessage) && ContextUtils.isOutbound(outMessage)) {
+                System.out.println("is requester and outbound message");
+            } else  if (!ContextUtils.isRequestor(outMessage) && !ContextUtils.isOutbound(outMessage)) {
+                System.out.println("is not requester and is inbound message");
+            } else if (!ContextUtils.isRequestor(outMessage) && ContextUtils.isOutbound(outMessage)) {
+                System.out.println("is not requester and is outbound message");
+            } if (ContextUtils.isRequestor(outMessage) && !ContextUtils.isOutbound(outMessage)) {
+                System.out.println("is requester and is inbound message");
+            }
+        }
+
+        public void onWrite() throws IOException {
+
+        }
+    }
+}

Propchange: incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaServerConduit.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/yoko/branches/cxf_port/bindings/src/main/java/org/apache/yoko/bindings/corba2/CorbaServerConduit.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date