You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/08/19 20:08:54 UTC
svn commit: r805908 - in /cxf/trunk/rt/bindings/soap: ./
src/main/java/org/apache/cxf/binding/soap/
src/main/java/org/apache/cxf/binding/soap/tcp/
src/main/java/org/apache/cxf/binding/soap/tcp/frames/
src/test/java/org/apache/cxf/binding/soap/tcp/
Author: dkulp
Date: Wed Aug 19 18:08:53 2009
New Revision: 805908
URL: http://svn.apache.org/viewvc?rev=805908&view=rev
Log:
Add serverside support for soap/tcp
Added:
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/BackendTcpConduit.java (with props)
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/ChannelService.java (with props)
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpChannel.java (with props)
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpCodecFactory.java
- copied, changed from r805907, cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpDestination.java (with props)
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageDecoder.java (with props)
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageEncoder.java (with props)
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpProtocolConsts.java
- copied, changed from r805907, cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpSessionState.java
- copied, changed from r805907, cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpMessage.java (with props)
Modified:
cxf/trunk/rt/bindings/soap/pom.xml
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/SoapTransportFactory.java
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpOutputStream.java
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpUtils.java
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/TCPConduit.java
cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java
cxf/trunk/rt/bindings/soap/src/test/java/org/apache/cxf/binding/soap/tcp/TCPConduitTest.java
Modified: cxf/trunk/rt/bindings/soap/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/pom.xml?rev=805908&r1=805907&r2=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/pom.xml (original)
+++ cxf/trunk/rt/bindings/soap/pom.xml Wed Aug 19 18:08:53 2009
@@ -107,6 +107,12 @@
<artifactId>${saaj.impl.artifactId}</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.mina</groupId>
+ <artifactId>mina-core</artifactId>
+ <version>2.0.0-M6</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
Modified: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/SoapTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/SoapTransportFactory.java?rev=805908&r1=805907&r2=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/SoapTransportFactory.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/SoapTransportFactory.java Wed Aug 19 18:08:53 2009
@@ -36,6 +36,7 @@
import org.apache.cxf.Bus;
import org.apache.cxf.BusException;
import org.apache.cxf.binding.soap.model.SoapBindingInfo;
+import org.apache.cxf.binding.soap.tcp.SoapTcpDestination;
import org.apache.cxf.binding.soap.tcp.TCPConduit;
import org.apache.cxf.binding.soap.wsdl11.SoapAddressPlugin;
import org.apache.cxf.common.util.StringUtils;
@@ -80,6 +81,11 @@
}
public Destination getDestination(EndpointInfo ei) throws IOException {
+ if (ei.getAddress() != null && ei.getAddress().startsWith("soap.tcp")) {
+ return new SoapTcpDestination(ei.getTarget(), ei);
+ //return SoapTcpDestination.getInstance(ei.getTarget(), ei);
+ }
+
SoapBindingInfo binding = (SoapBindingInfo)ei.getBinding();
DestinationFactory destinationFactory;
try {
Added: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/BackendTcpConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/BackendTcpConduit.java?rev=805908&view=auto
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/BackendTcpConduit.java (added)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/BackendTcpConduit.java Wed Aug 19 18:08:53 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.binding.soap.tcp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpMessage;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.configuration.Configurable;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.ws.policy.Assertor;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+
+public class BackendTcpConduit extends AbstractConduit implements Configurable, Assertor {
+ private static final Logger LOG = LogUtils.getL7dLogger(TCPConduit.class);
+ private IoSession session;
+
+ public BackendTcpConduit(IoSession session) {
+ super(null);
+ this.session = session;
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ public String getBeanName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void assertMessage(Message message) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean canAssert(QName type) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void prepare(Message message) throws IOException {
+ message.setContent(OutputStream.class, new ByteArrayOutputStream(512));
+ }
+
+ @Override
+ public void close(Message msg) throws IOException {
+ ByteArrayOutputStream baos = (ByteArrayOutputStream)msg.getContent(OutputStream.class);
+ Exchange exchange = msg.getExchange();
+ SoapTcpChannel channel = exchange.getInMessage().getContent(SoapTcpChannel.class);
+ String message = new String(baos.toByteArray());
+ SoapTcpMessage soapTcpMessage = SoapTcpMessage.createSoapTcpMessage(message, channel.getChannelId());
+ IoBuffer buffer = IoBuffer.allocate(512);
+ buffer.setAutoExpand(true);
+ SoapTcpUtils.writeSoapTcpMessage(buffer.asOutputStream(), soapTcpMessage);
+ buffer.flip();
+ session.write(buffer);
+ }
+}
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/BackendTcpConduit.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/BackendTcpConduit.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/ChannelService.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/ChannelService.java?rev=805908&view=auto
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/ChannelService.java (added)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/ChannelService.java Wed Aug 19 18:08:53 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.binding.soap.tcp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpMessage;
+import org.apache.cxf.staxutils.StaxUtils;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+
+public final class ChannelService {
+ private ChannelService() {
+
+ }
+
+ public static void service(IoSession session, SoapTcpMessage message) {
+ try {
+ XMLStreamReader xmlReader
+ = StaxUtils.createXMLStreamReader(message.getContentAsStream(), "UTF-8");
+ while (xmlReader.hasNext()) {
+ xmlReader.next();
+ if (xmlReader.getEventType() == XMLStreamReader.START_ELEMENT) {
+ if (xmlReader.getLocalName().equals("initiateSession")) {
+ initiateSession(session);
+ } else if (xmlReader.getLocalName().equals("openChannel")) {
+ String targetWSURI = null;
+ List<String> negotiatedMimeTypes = new ArrayList<String>();
+ List<String> negotiatedParams = new ArrayList<String>();
+ while (xmlReader.hasNext()) {
+ xmlReader.next();
+ if (xmlReader.getEventType() == XMLStreamReader.START_ELEMENT) {
+ if (xmlReader.getLocalName().equals("targetWSURI")) {
+ targetWSURI = xmlReader.getElementText();
+ } else if (xmlReader.getLocalName().equals("negotiatedMimeTypes")) {
+ negotiatedMimeTypes.add(xmlReader.getElementText());
+ } else if (xmlReader.getLocalName().equals("negotiatedParams")) {
+ negotiatedParams.add(xmlReader.getElementText());
+ }
+ }
+ }
+ openChannel(session, targetWSURI, negotiatedMimeTypes, negotiatedParams);
+ } else if (xmlReader.getLocalName().equals("closeChannel")) {
+ int channelId = -1;
+ while (xmlReader.hasNext()) {
+ if (xmlReader.getEventType() == XMLStreamReader.START_ELEMENT
+ && xmlReader.getLocalName().equals("channelId")) {
+ channelId = Integer.parseInt(xmlReader.getElementText());
+ }
+ }
+ closeChannel(session, channelId);
+ }
+
+ }
+ }
+ } catch (XMLStreamException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void initiateSession(IoSession session) {
+ System.out.println("initiateSession service");
+ String response = "<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ + "<s:Body><initiateSessionResponse xmlns=\"http://servicechannel.tcp.transport.ws.xml.sun.com/\""
+ + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/"
+ + "XMLSchema\"/></s:Body></s:Envelope>";
+ SoapTcpMessage soapTcpMessage = SoapTcpMessage.createSoapTcpMessage(response, 0);
+ IoBuffer buffer = IoBuffer.allocate(512);
+ buffer.setAutoExpand(true);
+ try {
+ SoapTcpUtils.writeSoapTcpMessage(buffer.asOutputStream(), soapTcpMessage);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ buffer.flip();
+ session.write(buffer);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void openChannel(IoSession session, String targetWSURI, List<String> negotiatedMimeTypes,
+ List<String> negotiatedParams) {
+ System.out.println("openChannel service");
+ List<SoapTcpChannel> channels = (List<SoapTcpChannel>)session.getAttribute("channels");
+ int max = 0;
+ for (SoapTcpChannel channel : channels) {
+ if (channel.getChannelId() > max) {
+ max = channel.getChannelId();
+ }
+ }
+ channels.add(new SoapTcpChannel(max + 1, targetWSURI));
+
+ String response = "<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\"><s:Body>"
+ + "<openChannelResponse xmlns=\"http://servicechannel.tcp.transport.ws.xml.sun.com/\""
+ + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org"
+ + "/2001/XMLSchema\"><channelId xmlns=\"\">"
+ + (max + 1)
+ + "</channelId><negotiatedMimeTypes xmlns=\"\">"
+ + "application/soap+xml</negotiatedMimeTypes><negotiatedParams xmlns=\"\">charset</negotia"
+ + "tedParams><negotiatedParams xmlns=\"\">SOAPAction</negotiatedParams><negotiatedParams xm"
+ + "lns=\"\">action</negotiatedParams></openChannelResponse></s:Body></s:Envelope>";
+ SoapTcpMessage soapTcpMessage = SoapTcpMessage.createSoapTcpMessage(response, 0);
+ IoBuffer buffer = IoBuffer.allocate(512);
+ buffer.setAutoExpand(true);
+ try {
+ SoapTcpUtils.writeSoapTcpMessage(buffer.asOutputStream(), soapTcpMessage);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ buffer.flip();
+ session.write(buffer);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void closeChannel(IoSession session, int channelId) {
+ System.out.println("closeChannel service");
+ List<SoapTcpChannel> channels = (List<SoapTcpChannel>)session.getAttribute("channels");
+ for (SoapTcpChannel channel : channels) {
+ if (channel.getChannelId() == channelId) {
+ channels.remove(channel);
+ break;
+ }
+ }
+
+ String response = "<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ + "<s:Body><closeChannelResponse xmlns=\"http://servicechannel.tcp.transport.ws.xml.sun.com/\""
+ + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/"
+ + "XMLSchema\"/></s:Body></s:Envelope>";
+ SoapTcpMessage soapTcpMessage = SoapTcpMessage.createSoapTcpMessage(response, 0);
+ IoBuffer buffer = IoBuffer.allocate(512);
+ buffer.setAutoExpand(true);
+ try {
+ SoapTcpUtils.writeSoapTcpMessage(buffer.asOutputStream(), soapTcpMessage);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ buffer.flip();
+ session.write(buffer);
+ }
+}
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/ChannelService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/ChannelService.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpChannel.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpChannel.java?rev=805908&view=auto
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpChannel.java (added)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpChannel.java Wed Aug 19 18:08:53 2009
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.binding.soap.tcp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpFrame;
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpFrameHeader;
+
+public class SoapTcpChannel {
+ private int channelId;
+ private String wsURI;
+ private List<SoapTcpFrame> frames;
+
+ public SoapTcpChannel(int channelId, String wsURI) {
+ this.channelId = channelId;
+ this.wsURI = wsURI;
+ frames = new ArrayList<SoapTcpFrame>();
+ }
+
+ public boolean addFrame(SoapTcpFrame frame) {
+ if (frame != null && frame.getChannelId() == channelId) {
+ if (frame.getHeader().getFrameType() == SoapTcpFrameHeader.SINGLE_FRAME_MESSAGE) {
+ frames.clear();
+ }
+ frames.add(frame);
+ return true;
+ }
+ return false;
+ }
+
+ public List<SoapTcpFrame> getFrames() {
+ return frames;
+ }
+
+ public void clearFrameBuffer() {
+ frames.clear();
+ }
+
+
+ public int getChannelId() {
+ return channelId;
+ }
+
+ public void setChannelId(int channelId) {
+ this.channelId = channelId;
+ }
+
+ public String getWsURI() {
+ return wsURI;
+ }
+
+ public void setWsURI(String wsURI) {
+ this.wsURI = wsURI;
+ }
+}
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpChannel.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpChannel.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpCodecFactory.java (from r805907, cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java)
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpCodecFactory.java?p2=cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpCodecFactory.java&p1=cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java&r1=805907&r2=805908&rev=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpCodecFactory.java Wed Aug 19 18:08:53 2009
@@ -17,32 +17,28 @@
* under the License.
*/
-package org.apache.cxf.binding.soap.tcp.frames;
+package org.apache.cxf.binding.soap.tcp;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
-public class SoapTcpFrame {
- private int channelId;
- private SoapTcpFrameHeader header;
- private byte[] payload;
+public class SoapTcpCodecFactory implements ProtocolCodecFactory {
+ private SoapTcpMessageEncoder encoder;
+ private SoapTcpMessageDecoder decoder;
- public int getChannelId() {
- return channelId;
+ public SoapTcpCodecFactory() {
+ encoder = new SoapTcpMessageEncoder();
+ decoder = new SoapTcpMessageDecoder();
}
- public void setChannelId(int channelId) {
- this.channelId = channelId;
- }
- public SoapTcpFrameHeader getHeader() {
- return header;
- }
- public void setHeader(SoapTcpFrameHeader header) {
- this.header = header;
- }
- public byte[] getPayload() {
- return payload;
+
+ public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
+ return decoder;
}
- public void setPayload(byte[] payload) {
- this.payload = payload;
+
+ public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
+ return encoder;
}
-
-
+
}
Added: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpDestination.java?rev=805908&view=auto
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpDestination.java (added)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpDestination.java Wed Aug 19 18:08:53 2009
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.binding.soap.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpMessage;
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.transport.AbstractDestination;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+
+public final class SoapTcpDestination extends AbstractDestination implements IoHandler {
+ private static final String MAGIC_IDENTIFIER = "vnd.sun.ws.tcp";
+ private static final Logger LOG = LogUtils.getL7dLogger(SoapTcpDestination.class);
+
+ public SoapTcpDestination(EndpointReferenceType ref, EndpointInfo ei) throws IOException {
+ this(null, ref, ei);
+ }
+
+ public SoapTcpDestination(Bus b, EndpointReferenceType ref, EndpointInfo ei) throws IOException {
+ super(b, ref, ei);
+
+ String address = ref.getAddress().getValue();
+ if (address.contains("soap.tcp://")) {
+ //String endPointAddress = address;
+ int beginIndex = address.indexOf("://");
+ int endIndex = address.indexOf(":", beginIndex + 1);
+ //String hostName = address.substring(beginIndex + 3, endIndex);
+ beginIndex = endIndex;
+ endIndex = address.indexOf("/", beginIndex);
+ int port = Integer.parseInt(address.substring(beginIndex + 1, endIndex));
+ //System.out.println("hostName: " + hostName);
+ //System.out.println("port: " + port);
+
+ IoAcceptor acceptor = new NioSocketAcceptor();
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+ //acceptor.getFilterChain().addLast("LowLevelProtocol", new SoapTcpIoFilter());
+ acceptor.getFilterChain().addLast("HighLevelProtocol",
+ new ProtocolCodecFilter(new SoapTcpCodecFactory()));
+ acceptor.setDefaultLocalAddress(new InetSocketAddress(port));
+ acceptor.setHandler(this);
+ acceptor.bind();
+ System.out.println("server is listenig at port " + port);
+ }
+ }
+
+ @Override
+ protected Conduit getInbuiltBackChannel(Message inMessage) {
+ return inMessage.getExchange().getConduit(inMessage);
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void messageReceived(IoSession session, Object message) throws Exception {
+ if (message instanceof SoapTcpMessage) {
+ BackendTcpConduit conduit = (BackendTcpConduit)session.getAttribute("conduit");
+ if (conduit == null) {
+ conduit = new BackendTcpConduit(session);
+ session.setAttribute("conduit", conduit);
+ }
+
+ if (((SoapTcpMessage)message).getChannelId() == 0) {
+ ChannelService.service(session, (SoapTcpMessage)message);
+ } else {
+ Message msg = new MessageImpl();
+ Exchange exchange = new ExchangeImpl();
+ exchange.setConduit(conduit);
+ exchange.setDestination(this);
+ msg.setExchange(exchange);
+ msg.setContent(InputStream.class, ((SoapTcpMessage)message).getContentAsStream());
+ msg.setContent(SoapTcpChannel.class, getChannel(session, (SoapTcpMessage)message));
+ msg.setContent(IoSession.class, session);
+ incomingObserver.onMessage(msg);
+ }
+ } else if (message instanceof IoBuffer) {
+ SoapTcpSessionState sessionState = (SoapTcpSessionState)session.getAttribute("sessionState");
+ if (sessionState != null
+ && sessionState.getStateId() == SoapTcpSessionState.SOAP_TCP_SESSION_STATE_NEW) {
+ IoBuffer buffer = (IoBuffer) message;
+ InputStream inStream = buffer.asInputStream();
+ byte magicIdBuffer[] = new byte[MAGIC_IDENTIFIER.length()];
+ inStream.read(magicIdBuffer);
+ String magicId = new String(magicIdBuffer, "US-ASCII");
+ if (magicId.equals(MAGIC_IDENTIFIER)) {
+ int version[] = new int[4];
+ DataCodingUtils.readInts4(inStream, version, 4);
+ if (version[0] == SoapTcpProtocolConsts.PROTOCOL_VERSION_MAJOR
+ && version[1] == SoapTcpProtocolConsts.PROTOCOL_VERSION_MINOR
+ && version[2] == SoapTcpProtocolConsts.CONNECTION_MANAGEMENT_VERSION_MAJOR
+ && version[3] == SoapTcpProtocolConsts.CONNECTION_MANAGEMENT_VERSION_MINOR) {
+ sessionState.setStateId(SoapTcpSessionState.SOAP_TCP_SESSION_STATE_AFTER_HANDSHAKE);
+ IoBuffer response = IoBuffer.allocate(2);
+ OutputStream out = response.asOutputStream();
+ DataCodingUtils.writeInts4(out, SoapTcpProtocolConsts.PROTOCOL_VERSION_MAJOR,
+ SoapTcpProtocolConsts.PROTOCOL_VERSION_MINOR,
+ SoapTcpProtocolConsts.CONNECTION_MANAGEMENT_VERSION_MAJOR,
+ SoapTcpProtocolConsts.CONNECTION_MANAGEMENT_VERSION_MINOR);
+ out.close();
+ response.flip();
+ session.write(response);
+ }
+ }
+ }
+ }
+ }
+
+ public void messageSent(IoSession session, Object message) throws Exception {
+ System.out.println("messageSent");
+
+ }
+
+ public void sessionClosed(IoSession session) throws Exception {
+ System.out.println("sessionClosed");
+
+ }
+
+ public void sessionCreated(IoSession session) throws Exception {
+ System.out.println("sessionCreated");
+
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
+ System.out.println("sessionIdle");
+
+ }
+
+ public void sessionOpened(IoSession session) throws Exception {
+ System.out.println("sessionOpened");
+ session.setAttribute("sessionState", new SoapTcpSessionState());
+ List<SoapTcpChannel> channels = new ArrayList<SoapTcpChannel>();
+ SoapTcpChannel channel0 = new SoapTcpChannel(0, "");
+ channels.add(channel0);
+ session.setAttribute("channels", channels);
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private SoapTcpChannel getChannel(IoSession session, SoapTcpMessage message) {
+ List<SoapTcpChannel> channels = (List<SoapTcpChannel>)session.getAttribute("channels");
+ if (channels != null) {
+ for (SoapTcpChannel channel : channels) {
+ if (channel.getChannelId() == message.getChannelId()) {
+ return channel;
+ }
+ }
+ }
+ return null;
+ }
+}
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpDestination.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageDecoder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageDecoder.java?rev=805908&view=auto
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageDecoder.java (added)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageDecoder.java Wed Aug 19 18:08:53 2009
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.binding.soap.tcp;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpFrame;
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpFrameHeader;
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpMessage;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+public class SoapTcpMessageDecoder extends ProtocolDecoderAdapter { //CumulativeProtocolDecoder {
+
+ @SuppressWarnings("unchecked")
+ public void decode(IoSession session, IoBuffer buffer, ProtocolDecoderOutput out)
+ throws Exception {
+
+ byte[] tempBuffer = (byte[])session.getAttribute("tempBuffer");
+ Integer bufferPosition = (Integer)session.getAttribute("bufferPosition");
+ Integer bufferDataLength = (Integer)session.getAttribute("bufferDataLength");
+ if (tempBuffer == null) {
+ tempBuffer = new byte[SoapTcpOutputStream.CHUNK_SIZE];
+ bufferDataLength = buffer.limit();
+ for (bufferPosition = new Integer(0); bufferPosition < bufferDataLength; bufferPosition++) {
+ tempBuffer[bufferPosition] = buffer.get();
+ }
+ session.setAttribute("tempBuffer", tempBuffer);
+ session.setAttribute("bufferPosition", bufferPosition);
+ session.setAttribute("bufferDataLength", bufferDataLength);
+ } else {
+ bufferDataLength += buffer.limit();
+ for (; bufferPosition < bufferDataLength; bufferPosition++) {
+ tempBuffer[bufferPosition] = buffer.get();
+ }
+ }
+
+ SoapTcpSessionState sessionState = (SoapTcpSessionState)session.getAttribute("sessionState");
+ if (sessionState != null
+ && sessionState.getStateId() == SoapTcpSessionState.SOAP_TCP_SESSION_STATE_NEW) {
+ if (bufferPosition == 16) {
+ out.write(IoBuffer.wrap(tempBuffer, 0, bufferPosition));
+ bufferPosition = 0;
+ bufferDataLength = 0;
+ session.setAttribute("bufferPosition", bufferPosition);
+ session.setAttribute("bufferDataLength", bufferDataLength);
+ return;
+ } else {
+ return;
+ }
+ }
+
+ InputStream inStream = new ByteArrayInputStream(tempBuffer, 0, bufferDataLength);
+ try {
+ SoapTcpFrame frame = SoapTcpUtils.readMessageFrame(inStream);
+ List<SoapTcpChannel> channels = (List<SoapTcpChannel>)session.getAttribute("channels");
+ for (SoapTcpChannel channel : channels) {
+ if (channel.getChannelId() == frame.getChannelId()) {
+ switch (frame.getHeader().getFrameType()) {
+ case SoapTcpFrameHeader.SINGLE_FRAME_MESSAGE:
+ case SoapTcpFrameHeader.ERROR_MESSAGE:
+ case SoapTcpFrameHeader.NULL_MESSAGE:
+ SoapTcpMessage singleFrameMessage = SoapTcpMessage.createSoapTcpMessage(frame);
+ out.write(singleFrameMessage);
+ bufferPosition = 0;
+ bufferDataLength = 0;
+ break;
+ case SoapTcpFrameHeader.MESSAGE_START_CHUNK:
+ case SoapTcpFrameHeader.MESSAGE_CHUNK:
+ channel.addFrame(frame);
+ bufferPosition = 0;
+ bufferDataLength = 0;
+ break;
+ case SoapTcpFrameHeader.MESSAGE_END_CHUNK:
+ List<SoapTcpFrame> frames = channel.getFrames();
+ SoapTcpMessage multiFrameMessage = SoapTcpMessage.createSoapTcpMessage(frames);
+ multiFrameMessage.getFrames().add(frame);
+ out.write(multiFrameMessage);
+ bufferPosition = 0;
+ bufferDataLength = 0;
+ break;
+ default:
+ return;
+ }
+ }
+ }
+ } catch (IOException ex) {
+ //
+ } finally {
+ session.setAttribute("bufferPosition", bufferPosition);
+ session.setAttribute("bufferDataLength", bufferDataLength);
+ }
+ }
+
+}
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageDecoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageDecoder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageEncoder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageEncoder.java?rev=805908&view=auto
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageEncoder.java (added)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageEncoder.java Wed Aug 19 18:08:53 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.binding.soap.tcp;
+
+import java.io.OutputStream;
+
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpMessage;
+import org.apache.mina.core.buffer.IoBuffer;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
+import org.apache.mina.filter.codec.ProtocolEncoderOutput;
+
+public class SoapTcpMessageEncoder extends ProtocolEncoderAdapter {
+
+ public void encode(IoSession session, Object obj, ProtocolEncoderOutput out) throws Exception {
+ if (obj instanceof SoapTcpMessage) {
+
+ SoapTcpMessage msg = (SoapTcpMessage)obj;
+ IoBuffer buffer = IoBuffer.allocate(1024);
+ buffer.setAutoExpand(true);
+ OutputStream outStream = buffer.asOutputStream();
+ SoapTcpUtils.writeSoapTcpMessage(outStream, msg);
+ outStream.close();
+ out.write(buffer);
+ }
+
+ }
+
+}
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageEncoder.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpMessageEncoder.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpOutputStream.java?rev=805908&r1=805907&r2=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpOutputStream.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpOutputStream.java Wed Aug 19 18:08:53 2009
@@ -128,8 +128,8 @@
final SoapTcpFrameHeader header =
new SoapTcpFrameHeader(SoapTcpFrameHeader.SINGLE_FRAME_MESSAGE, contentDesc);
final SoapTcpFrame frame = new SoapTcpFrame();
- frame.setChannelId(0);
frame.setHeader(header);
+ frame.setChannelId(0);
try {
frame.setPayload(openChannelMsg.getBytes("UTF-8"));
SoapTcpUtils.writeMessageFrame(outStream, frame);
@@ -174,8 +174,8 @@
new SoapTcpFrameHeader(SoapTcpFrameHeader.SINGLE_FRAME_MESSAGE, contentDesc);
header.setChannelId(channelId);
final SoapTcpFrame frame = new SoapTcpFrame();
- frame.setChannelId(channelId);
frame.setHeader(header);
+ frame.setChannelId(channelId);
frame.setPayload(this.buffer.toByteArray());
SoapTcpUtils.writeMessageFrame(outStream, frame);
messageSent = true;
@@ -206,8 +206,8 @@
new SoapTcpFrameHeader(SoapTcpFrameHeader.MESSAGE_START_CHUNK, contentDesc);
header.setChannelId(channelId);
SoapTcpFrame frame = new SoapTcpFrame();
- frame.setChannelId(channelId);
frame.setHeader(header);
+ frame.setChannelId(channelId);
frame.setPayload(this.buffer.toByteArray());
SoapTcpUtils.writeMessageFrame(outStream, frame);
messageSent = true;
Copied: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpProtocolConsts.java (from r805907, cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java)
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpProtocolConsts.java?p2=cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpProtocolConsts.java&p1=cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java&r1=805907&r2=805908&rev=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpProtocolConsts.java Wed Aug 19 18:08:53 2009
@@ -17,32 +17,18 @@
* under the License.
*/
-package org.apache.cxf.binding.soap.tcp.frames;
+package org.apache.cxf.binding.soap.tcp;
+public final class SoapTcpProtocolConsts {
-public class SoapTcpFrame {
- private int channelId;
- private SoapTcpFrameHeader header;
- private byte[] payload;
+ public static final String MAGIC_IDENTIFIER = "vnd.sun.ws.tcp";
+ public static final int PROTOCOL_VERSION_MAJOR = 1;
+ public static final int PROTOCOL_VERSION_MINOR = 0;
+ public static final int CONNECTION_MANAGEMENT_VERSION_MAJOR = 1;
+ public static final int CONNECTION_MANAGEMENT_VERSION_MINOR = 0;
- public int getChannelId() {
- return channelId;
+ private SoapTcpProtocolConsts() {
+
}
- public void setChannelId(int channelId) {
- this.channelId = channelId;
- }
- public SoapTcpFrameHeader getHeader() {
- return header;
- }
- public void setHeader(SoapTcpFrameHeader header) {
- this.header = header;
- }
- public byte[] getPayload() {
- return payload;
- }
- public void setPayload(byte[] payload) {
- this.payload = payload;
- }
-
-
+
}
Copied: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpSessionState.java (from r805907, cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java)
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpSessionState.java?p2=cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpSessionState.java&p1=cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java&r1=805907&r2=805908&rev=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpSessionState.java Wed Aug 19 18:08:53 2009
@@ -17,31 +17,25 @@
* under the License.
*/
-package org.apache.cxf.binding.soap.tcp.frames;
+package org.apache.cxf.binding.soap.tcp;
-
-public class SoapTcpFrame {
- private int channelId;
- private SoapTcpFrameHeader header;
- private byte[] payload;
+public class SoapTcpSessionState {
+ public static final byte SOAP_TCP_SESSION_STATE_NEW = 0;
+ public static final byte SOAP_TCP_SESSION_STATE_AFTER_HANDSHAKE = 1;
+ public static final byte SOAP_TCP_SESSION_STATE_INITIATED = 2;
- public int getChannelId() {
- return channelId;
- }
- public void setChannelId(int channelId) {
- this.channelId = channelId;
- }
- public SoapTcpFrameHeader getHeader() {
- return header;
- }
- public void setHeader(SoapTcpFrameHeader header) {
- this.header = header;
+ private byte stateId;
+
+ public SoapTcpSessionState() {
+ stateId = SOAP_TCP_SESSION_STATE_NEW;
}
- public byte[] getPayload() {
- return payload;
+
+ public byte getStateId() {
+ return stateId;
}
- public void setPayload(byte[] payload) {
- this.payload = payload;
+
+ public void setStateId(byte stateId) {
+ this.stateId = stateId;
}
Modified: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpUtils.java?rev=805908&r1=805907&r2=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpUtils.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/SoapTcpUtils.java Wed Aug 19 18:08:53 2009
@@ -35,6 +35,7 @@
import org.apache.cxf.binding.soap.tcp.frames.SoapTcpFrame;
import org.apache.cxf.binding.soap.tcp.frames.SoapTcpFrameContentDescription;
import org.apache.cxf.binding.soap.tcp.frames.SoapTcpFrameHeader;
+import org.apache.cxf.binding.soap.tcp.frames.SoapTcpMessage;
import org.apache.cxf.staxutils.StaxUtils;
public final class SoapTcpUtils {
@@ -43,6 +44,13 @@
}
+ public static void writeSoapTcpMessage(final OutputStream out, final SoapTcpMessage msg)
+ throws IOException {
+ for (SoapTcpFrame frame : msg.getFrames()) {
+ writeMessageFrame(out, frame);
+ }
+ }
+
/**
* Method that writes single SoapTcpFrame
* @param out
@@ -101,42 +109,41 @@
final int payloadLength = DataCodingUtils.readInt8(inputStream);
final byte payload[] = new byte[payloadLength];
- inputStream.read(payload, 0, payload.length);
+ if (inputStream.read(payload, 0, payload.length) != payloadLength) {
+ throw new IOException();
+ }
frame.setPayload(payload);
return frame;
}
- private static SoapTcpFrameContentDescription readContentDescription(final InputStream inputStream) {
+ private static SoapTcpFrameContentDescription readContentDescription(final InputStream inputStream)
+ throws IOException {
final int response[] = new int[2];
- try {
- DataCodingUtils.readInts4(inputStream, response, 2); //[0] content-id, [1] number-of-parameters
-
- final SoapTcpFrameContentDescription contentDesc = new SoapTcpFrameContentDescription();
- contentDesc.setContentId(response[0]);
- final int numOfParams = response[1];
-
- final Map<Integer, String> parameters = new Hashtable<Integer, String>();
- for (int i = 0; i < numOfParams; i++) {
- DataCodingUtils.readInts4(inputStream, response, 2); //[0] parameter-id, [1] string-length
- if (response[1] > 0) {
- final byte[] buffer = new byte[response[1]];
- if (inputStream.read(buffer) > 0) {
- final String value = new String(buffer, "UTF-8");
- parameters.put(Integer.valueOf(response[0]), value);
- //System.out.println("parameter-id = " + response[0] + " parameter-value = " + value);
- }
+ DataCodingUtils.readInts4(inputStream, response, 2); //[0] content-id, [1] number-of-parameters
+
+ final SoapTcpFrameContentDescription contentDesc = new SoapTcpFrameContentDescription();
+ contentDesc.setContentId(response[0]);
+ final int numOfParams = response[1];
+
+ final Map<Integer, String> parameters = new Hashtable<Integer, String>();
+ for (int i = 0; i < numOfParams; i++) {
+ DataCodingUtils.readInts4(inputStream, response, 2); //[0] parameter-id, [1] string-length
+ if (response[1] > 0) {
+ final byte[] buffer = new byte[response[1]];
+ if (inputStream.read(buffer) > 0) {
+ final String value = new String(buffer, "UTF-8");
+ parameters.put(Integer.valueOf(response[0]), value);
+ //System.out.println("parameter-id = " + response[0] + " parameter-value = " + value);
}
}
- contentDesc.setParameters(parameters);
-
- return contentDesc;
- } catch (IOException e) {
- e.printStackTrace();
- return null;
}
+ contentDesc.setParameters(parameters);
+
+ return contentDesc;
}
+
/**
* Method that parse SoapTcpFrame payload to find important tag.
*
Modified: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/TCPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/TCPConduit.java?rev=805908&r1=805907&r2=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/TCPConduit.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/TCPConduit.java Wed Aug 19 18:08:53 2009
@@ -47,20 +47,11 @@
private static final Logger LOG = LogUtils.getL7dLogger(TCPConduit.class);
- private static final String MAGIC_IDENTIFIER = "vnd.sun.ws.tcp";
- private static final int PROTOCOL_VERSION_MAJOR = 1;
- private static final int PROTOCOL_VERSION_MINOR = 0;
- private static final int CONNECTION_MANAGEMENT_VERSION_MAJOR = 1;
- private static final int CONNECTION_MANAGEMENT_VERSION_MINOR = 0;
-
private Socket socket;
private InputStream in;
private OutputStream out;
private String endPointAddress;
- public TCPConduit(EndpointInfo t) throws IOException {
- this(t.getTarget());
- }
public TCPConduit(EndpointReferenceType t) throws IOException {
super(t);
@@ -84,10 +75,11 @@
in = socket.getInputStream();
out = socket.getOutputStream();
- out.write(MAGIC_IDENTIFIER.getBytes("US-ASCII"));
- DataCodingUtils.writeInts4(out, PROTOCOL_VERSION_MAJOR, PROTOCOL_VERSION_MINOR,
- CONNECTION_MANAGEMENT_VERSION_MAJOR,
- CONNECTION_MANAGEMENT_VERSION_MINOR);
+ out.write(SoapTcpProtocolConsts.MAGIC_IDENTIFIER.getBytes("US-ASCII"));
+ DataCodingUtils.writeInts4(out, SoapTcpProtocolConsts.PROTOCOL_VERSION_MAJOR,
+ SoapTcpProtocolConsts.PROTOCOL_VERSION_MINOR,
+ SoapTcpProtocolConsts.CONNECTION_MANAGEMENT_VERSION_MAJOR,
+ SoapTcpProtocolConsts.CONNECTION_MANAGEMENT_VERSION_MINOR);
out.flush();
final int version[] = new int[4];
@@ -101,6 +93,10 @@
initSession();
}
+ public TCPConduit(EndpointInfo ei) throws IOException {
+ this(ei.getTarget());
+ }
+
private void initSession() throws IOException {
final String initSessionMessage = "<s:Envelope xmlns:s=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<s:Body><initiateSession xmlns=\"http://servicechannel.tcp.transport.ws.xml.sun.com/\""
@@ -124,8 +120,8 @@
final SoapTcpFrameHeader header =
new SoapTcpFrameHeader(SoapTcpFrameHeader.SINGLE_FRAME_MESSAGE, contentDesc);
SoapTcpFrame frame = new SoapTcpFrame();
- frame.setChannelId(0);
frame.setHeader(header);
+ frame.setChannelId(0);
frame.setPayload(initSessionMessageBytes);
try {
SoapTcpUtils.writeMessageFrame(out, frame);
Modified: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java?rev=805908&r1=805907&r2=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java (original)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpFrame.java Wed Aug 19 18:08:53 2009
@@ -19,17 +19,15 @@
package org.apache.cxf.binding.soap.tcp.frames;
-
public class SoapTcpFrame {
- private int channelId;
private SoapTcpFrameHeader header;
private byte[] payload;
public int getChannelId() {
- return channelId;
+ return header.getChannelId();
}
public void setChannelId(int channelId) {
- this.channelId = channelId;
+ this.header.setChannelId(channelId);
}
public SoapTcpFrameHeader getHeader() {
return header;
@@ -43,6 +41,4 @@
public void setPayload(byte[] payload) {
this.payload = payload;
}
-
-
}
Added: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpMessage.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpMessage.java?rev=805908&view=auto
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpMessage.java (added)
+++ cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpMessage.java Wed Aug 19 18:08:53 2009
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.binding.soap.tcp.frames;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cxf.binding.soap.tcp.DataCodingUtils;
+import org.apache.cxf.binding.soap.tcp.SoapTcpOutputStream;
+
+public final class SoapTcpMessage {
+ private List<SoapTcpFrame> frames;
+
+ private SoapTcpMessage() {
+ frames = new ArrayList<SoapTcpFrame>();
+ }
+
+ public static SoapTcpMessage createSoapTcpMessage(SoapTcpFrame frame) {
+ SoapTcpMessage soapTcpMessage = new SoapTcpMessage();
+ soapTcpMessage.getFrames().add(frame);
+ return soapTcpMessage;
+ }
+
+ public static SoapTcpMessage createSoapTcpMessage(List<SoapTcpFrame> frames) {
+ SoapTcpMessage soapTcpMessage = new SoapTcpMessage();
+ soapTcpMessage.getFrames().addAll(frames);
+ return soapTcpMessage;
+ }
+
+ public static SoapTcpMessage createSoapTcpMessage(String message, int channelId) {
+ SoapTcpMessage soapTcpMessage = new SoapTcpMessage();
+ try {
+ byte[] msgContent = message.getBytes("UTF-8");
+ int numOfFrames = (int)Math.ceil((float)msgContent.length
+ / (float)SoapTcpOutputStream.CHUNK_SIZE);
+ if (numOfFrames > 1) {
+ int offset = 0;
+ byte[] payload = new byte[SoapTcpOutputStream.CHUNK_SIZE];
+ for (int i = 1; i <= numOfFrames; i++) {
+ if (i == numOfFrames) {
+ payload = new byte[msgContent.length % SoapTcpOutputStream.CHUNK_SIZE];
+ }
+
+ for (int j = 0; j < payload.length; j++) {
+ payload[j] = msgContent[offset + j];
+ }
+
+ SoapTcpFrame frame = null;
+ if (i == 1) {
+ frame = createSoapTcpFrame(SoapTcpFrameHeader.MESSAGE_START_CHUNK,
+ payload, channelId);
+ } else if (i < numOfFrames) {
+ frame = createSoapTcpFrame(SoapTcpFrameHeader.MESSAGE_CHUNK, payload, channelId);
+ } else {
+ frame = createSoapTcpFrame(SoapTcpFrameHeader.MESSAGE_END_CHUNK, payload, channelId);
+ }
+
+ soapTcpMessage.frames.add(frame);
+ offset += SoapTcpOutputStream.CHUNK_SIZE;
+ }
+
+ } else {
+ soapTcpMessage.frames.
+ add(createSoapTcpFrame(SoapTcpFrameHeader.SINGLE_FRAME_MESSAGE, msgContent, channelId));
+ }
+ return soapTcpMessage;
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ public static SoapTcpMessage createErrorMessage(int code, int subCode, String description,
+ int channelId) {
+ SoapTcpMessage soapTcpMessage = new SoapTcpMessage();
+ SoapTcpFrame frame = new SoapTcpFrame();
+ SoapTcpFrameHeader header = new SoapTcpFrameHeader();
+ header.setChannelId(channelId);
+ header.setFrameType(SoapTcpFrameHeader.ERROR_MESSAGE);
+ frame.setHeader(header);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ DataCodingUtils.writeInts4(baos, code, subCode);
+ byte[] strByteArray = description.getBytes("UTF-8");
+ DataCodingUtils.writeInt8(baos, strByteArray.length);
+ baos.write(strByteArray);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ frame.setPayload(baos.toByteArray());
+ soapTcpMessage.getFrames().add(frame);
+
+ return soapTcpMessage;
+ }
+
+ public void setChannelId(int channelId) {
+ for (SoapTcpFrame frame : frames) {
+ frame.setChannelId(channelId);
+ }
+ }
+
+ public int getChannelId() {
+ if (frames.size() > 0) {
+ return frames.get(0).getChannelId();
+ }
+ return -1;
+ }
+
+ public void setFrames(List<SoapTcpFrame> frames) {
+ this.frames = frames;
+ }
+
+ public List<SoapTcpFrame> getFrames() {
+ return frames;
+ }
+
+ public String getContent() {
+ StringBuffer result = new StringBuffer();
+
+ try {
+ for (SoapTcpFrame frame : frames) {
+ result.append(new String(frame.getPayload(), "UTF-8"));
+ }
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+
+ return result.toString();
+ }
+
+ public InputStream getContentAsStream() {
+ int buffLength = 0;
+ for (SoapTcpFrame frame : frames) {
+ buffLength += frame.getPayload().length;
+ }
+ byte buffer[] = new byte[buffLength];
+ int index = 0;
+ byte payload[] = null;
+ for (SoapTcpFrame frame : frames) {
+ payload = frame.getPayload();
+ for (int i = 0; i < payload.length; i++) {
+ buffer[index] = payload[i];
+ index++;
+ }
+ }
+ return new ByteArrayInputStream(buffer);
+ }
+
+ private static SoapTcpFrame createSoapTcpFrame(int frameType, byte[] payload, int channelId) {
+ SoapTcpFrame frame = new SoapTcpFrame();
+ SoapTcpFrameHeader header = new SoapTcpFrameHeader();
+ SoapTcpFrameContentDescription contentDesc = null;
+ if (frameType == SoapTcpFrameHeader.SINGLE_FRAME_MESSAGE
+ || frameType == SoapTcpFrameHeader.MESSAGE_START_CHUNK) {
+ contentDesc = new SoapTcpFrameContentDescription();
+ contentDesc.setContentId(0);
+
+ final Map<Integer, String> parameters = new Hashtable<Integer, String>();
+ parameters.put(0, "utf-8");
+
+ contentDesc.setParameters(parameters);
+ }
+ header.setChannelId(channelId);
+ header.setFrameType(frameType);
+ header.setContentDescription(contentDesc);
+ frame.setHeader(header);
+ frame.setPayload(payload);
+ return frame;
+ }
+}
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/tcp/frames/SoapTcpMessage.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/rt/bindings/soap/src/test/java/org/apache/cxf/binding/soap/tcp/TCPConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/bindings/soap/src/test/java/org/apache/cxf/binding/soap/tcp/TCPConduitTest.java?rev=805908&r1=805907&r2=805908&view=diff
==============================================================================
--- cxf/trunk/rt/bindings/soap/src/test/java/org/apache/cxf/binding/soap/tcp/TCPConduitTest.java (original)
+++ cxf/trunk/rt/bindings/soap/src/test/java/org/apache/cxf/binding/soap/tcp/TCPConduitTest.java Wed Aug 19 18:08:53 2009
@@ -19,7 +19,7 @@
package org.apache.cxf.binding.soap.tcp;
-import java.io.ByteArrayInputStream;
+//import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -28,13 +28,13 @@
import java.util.List;
import java.util.Map;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
+//import javax.xml.stream.XMLStreamException;
+//import javax.xml.stream.XMLStreamReader;
import org.apache.cxf.binding.soap.SoapMessage;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
-import org.apache.cxf.staxutils.StaxUtils;
+//import org.apache.cxf.staxutils.StaxUtils;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
@@ -42,6 +42,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
+//import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
@@ -64,8 +65,8 @@
@Test
public void testPrepare() {
- int num1 = 2;
- int num2 = 3;
+ //int num1 = 2;
+ //int num2 = 3;
/*
final String messageData = "<s:Envelope xmlns:s=\"http://www.w3.org/2003/05/soap-envelope\""
+ " xmlns:a=\"http://www.w3.org/2005/08/addressing\"><s:Header><a:Action s:mustUnderstand=\"1\">"
@@ -80,12 +81,19 @@
*/
- final String messageData = "<S:Envelope xmlns:S=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ /*final String messageData = "<S:Envelope xmlns:S=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ "<S:Body><ns2:add xmlns:ns2=\"http://calculator.me.org/\"><i>"
- + num1 + "</i><j>" + num2 + "</j></ns2:add></S:Body></S:Envelope>";
+ + num1 + "</i><j>" + num2 + "</j></ns2:add></S:Body></S:Envelope>";*/
+ String name = new String("CXF");
+ /*for (int i = 0; i < 6000; i++) {
+ name += "A";
+ }*/
+ final String messageData = "<S:Envelope xmlns:S=\"http://schemas.xmlsoap.org/soap/envelope/\">"
+ + "<S:Body><sayHi><text>" + name + "</text></sayHi></S:Body></S:Envelope>";
final AttributedURIType a = new AttributedURIType();
- a.setValue("soap.tcp://localhost:8080/CalculatorApp/CalculatorWSService");
+ //a.setValue("soap.tcp://localhost:8080/CalculatorApp/CalculatorWSService");
+ a.setValue("soap.tcp://localhost:9999/HelloWorld");
final EndpointReferenceType t = new EndpointReferenceType();
t.setAddress(a);
@@ -123,18 +131,20 @@
private class TestMessageObserver implements MessageObserver {
public void onMessage(final Message message) {
- int correctResult = 5;
+ //int correctResult = 5;
assertNotNull(message);
InputStream input = message.getContent(InputStream.class);
byte response[] = null;
try {
response = new byte[input.available()];
input.read(response);
+ String s = new String(response, "UTF-8");
+ System.out.println(s);
} catch (IOException e) {
e.printStackTrace();
}
- try {
+ /*try {
ByteArrayInputStream bais = new ByteArrayInputStream(response);
XMLStreamReader xmlReader = StaxUtils.createXMLStreamReader(bais, "UTF-8");
@@ -147,7 +157,7 @@
}
} catch (XMLStreamException e) {
e.printStackTrace();
- }
+ }*/
}
}