You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ff...@apache.org on 2016/04/06 04:35:17 UTC

camel git commit: [CAMEL-9821]add mep uri param for camel-cxf endpoint

Repository: camel
Updated Branches:
  refs/heads/master 0437f7db6 -> 92becd2a9


[CAMEL-9821]add mep uri param for camel-cxf endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92becd2a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92becd2a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92becd2a

Branch: refs/heads/master
Commit: 92becd2a94dc0135dee0b92c8caff31eb51dae31
Parents: 0437f7d
Author: Freeman Fang <fr...@gmail.com>
Authored: Wed Apr 6 10:34:45 2016 +0800
Committer: Freeman Fang <fr...@gmail.com>
Committed: Wed Apr 6 10:34:45 2016 +0800

----------------------------------------------------------------------
 .../apache/camel/component/cxf/CxfConsumer.java |  5 ++
 .../apache/camel/component/cxf/CxfEndpoint.java | 19 +++++++
 .../cxf/feature/RAWDataFormatFeature.java       | 26 ++++++++++
 .../OneWayOutgoingChainInterceptor.java         | 52 ++++++++++++++++++++
 4 files changed, 102 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index fab7267..5b32832 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -172,7 +172,12 @@ public class CxfConsumer extends DefaultConsumer {
                     if (boi.getOperationInfo().isOneWay()) {
                         camelExchange.setPattern(ExchangePattern.InOnly);
                     }
+                } else {
+                    if (cxfEndpoint.getMep() != null && cxfEndpoint.getMep().equals("InOnly")) {
+                        camelExchange.setPattern(ExchangePattern.InOnly);
+                    }
                 }
+
                 
                 // set data format mode in Camel exchange
                 camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat);   

http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
index 8dd5442..12b9ce0 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
@@ -192,6 +192,8 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     private String password;
     @UriParam(label = "advanced", prefix = "properties.", multiValue = true)
     private Map<String, Object> properties;
+    @UriParam(name = "mep")
+    private String mep;
 
     public CxfEndpoint() {
     }
@@ -309,6 +311,11 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
                 sfb.setDataBinding(new SourceDataBinding());
             } else if (getDataFormat().dealias() == DataFormat.RAW) {
                 RAWDataFormatFeature feature = new RAWDataFormatFeature();
+                if (this.getMep() != null && this.getMep().equals("InOnly")) {
+                    //if DataFormat is RAW|MESSAGE, can't read message so can't
+                    //determine it's oneway so need get the MEP from URI explicitly
+                    feature.setOneway(true);
+                }
                 feature.addInIntercepters(getInInterceptors());
                 feature.addOutInterceptors(getOutInterceptors());
                 sfb.getFeatures().add(feature);
@@ -1120,6 +1127,18 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         this.username = username;
     }
 
+    public String getMep() {
+        return mep;
+    }
+
+    /**
+     * The Message Exchange Pattern
+     */
+    public void setMep(String mep) {
+        this.mep = mep;
+    }
+
+
     /**
      * We need to override the {@link ClientImpl#setParameters} method
      * to insert parameters into CXF Message for {@link DataFormat#PAYLOAD} mode.

http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
index ead2489..22f67fc 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
@@ -17,12 +17,16 @@
 
 package org.apache.camel.component.cxf.feature;
 
+import org.apache.camel.component.cxf.interceptors.OneWayOutgoingChainInterceptor;
 import org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor;
 import org.apache.camel.component.cxf.interceptors.RawMessageWSDLGetInterceptor;
 import org.apache.cxf.Bus;
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.interceptor.LoggingOutInterceptor;
+import org.apache.cxf.interceptor.OneWayProcessorInterceptor;
+import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.Phase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +53,8 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature {
     // filter the unused in phase interceptor
     private static final String[] REMAINING_OUT_PHASES = {Phase.PREPARE_SEND, Phase.USER_STREAM,
         Phase.WRITE, Phase.SEND, Phase.PREPARE_SEND_ENDING};
+    
+    private boolean oneway;
 
     @Override
     public void initialize(Client client, Bus bus) {
@@ -89,12 +95,32 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature {
 
         // setup the RawMessageWSDLGetInterceptor
         server.getEndpoint().getInInterceptors().add(RawMessageWSDLGetInterceptor.INSTANCE);
+        // Oneway with RAW message
+        if (isOneway()) {
+            Interceptor<? extends Message> toRemove = null;
+            for (Interceptor<? extends Message> i : server.getEndpoint().getService().getInInterceptors()) {
+                if (i.getClass().getName().equals("org.apache.cxf.interceptor.OutgoingChainInterceptor")) {
+                    toRemove = i;
+                }
+            }
+            server.getEndpoint().getService().getInInterceptors().remove(toRemove);
+            server.getEndpoint().getInInterceptors().add(new OneWayOutgoingChainInterceptor());
+            server.getEndpoint().getInInterceptors().add(new OneWayProcessorInterceptor());
+        }
     }
 
     @Override
     protected Logger getLogger() {
         return LOG;
     }
+
+    public boolean isOneway() {
+        return oneway;
+    }
+
+    public void setOneway(boolean oneway) {
+        this.oneway = oneway;
+    }
    
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
new file mode 100644
index 0000000..f586847
--- /dev/null
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cxf.interceptors;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.cxf.interceptor.OutgoingChainInterceptor;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.phase.AbstractPhaseInterceptor;
+import org.apache.cxf.phase.Phase;
+
+
+public class OneWayOutgoingChainInterceptor extends AbstractPhaseInterceptor<Message> {
+    
+    public OneWayOutgoingChainInterceptor() {
+        super(Phase.POST_INVOKE);
+        this.addBefore(OutgoingChainInterceptor.class.getName());
+    }
+
+    public void handleMessage(Message message) {
+        closeInput(message);
+        return;
+    }
+    
+    private void closeInput(Message message) {
+        InputStream is = message.getContent(InputStream.class);
+        if (is != null) {
+            try {
+                is.close();
+                message.removeContent(InputStream.class);
+            } catch (IOException ioex) {
+                //ignore
+            }
+        }
+    }
+}


Re: camel git commit: [CAMEL-9821]add mep uri param for camel-cxf endpoint

Posted by Freeman Fang <fr...@gmail.com>.
Thanks Claus to point this out, on it!
-------------
Freeman(Yue) Fang

Red Hat, Inc. 
FuseSource is now part of Red Hat



> On Apr 6, 2016, at 12:58 PM, Claus Ibsen <cl...@gmail.com> wrote:
> 
> -1
> 
> There is already an exchangePattern option from the DefaultEndpoint.
> 
> Can you not use that, instead of inventing a new duplicate option?
> 
> 
> 
> On Wed, Apr 6, 2016 at 4:35 AM,  <ff...@apache.org> wrote:
>> Repository: camel
>> Updated Branches:
>>  refs/heads/master 0437f7db6 -> 92becd2a9
>> 
>> 
>> [CAMEL-9821]add mep uri param for camel-cxf endpoint
>> 
>> 
>> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92becd2a
>> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92becd2a
>> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92becd2a
>> 
>> Branch: refs/heads/master
>> Commit: 92becd2a94dc0135dee0b92c8caff31eb51dae31
>> Parents: 0437f7d
>> Author: Freeman Fang <fr...@gmail.com>
>> Authored: Wed Apr 6 10:34:45 2016 +0800
>> Committer: Freeman Fang <fr...@gmail.com>
>> Committed: Wed Apr 6 10:34:45 2016 +0800
>> 
>> ----------------------------------------------------------------------
>> .../apache/camel/component/cxf/CxfConsumer.java |  5 ++
>> .../apache/camel/component/cxf/CxfEndpoint.java | 19 +++++++
>> .../cxf/feature/RAWDataFormatFeature.java       | 26 ++++++++++
>> .../OneWayOutgoingChainInterceptor.java         | 52 ++++++++++++++++++++
>> 4 files changed, 102 insertions(+)
>> ----------------------------------------------------------------------
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
>> ----------------------------------------------------------------------
>> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
>> index fab7267..5b32832 100644
>> --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
>> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
>> @@ -172,7 +172,12 @@ public class CxfConsumer extends DefaultConsumer {
>>                     if (boi.getOperationInfo().isOneWay()) {
>>                         camelExchange.setPattern(ExchangePattern.InOnly);
>>                     }
>> +                } else {
>> +                    if (cxfEndpoint.getMep() != null && cxfEndpoint.getMep().equals("InOnly")) {
>> +                        camelExchange.setPattern(ExchangePattern.InOnly);
>> +                    }
>>                 }
>> +
>> 
>>                 // set data format mode in Camel exchange
>>                 camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat);
>> 
>> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
>> ----------------------------------------------------------------------
>> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
>> index 8dd5442..12b9ce0 100644
>> --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
>> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
>> @@ -192,6 +192,8 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
>>     private String password;
>>     @UriParam(label = "advanced", prefix = "properties.", multiValue = true)
>>     private Map<String, Object> properties;
>> +    @UriParam(name = "mep")
>> +    private String mep;
>> 
>>     public CxfEndpoint() {
>>     }
>> @@ -309,6 +311,11 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
>>                 sfb.setDataBinding(new SourceDataBinding());
>>             } else if (getDataFormat().dealias() == DataFormat.RAW) {
>>                 RAWDataFormatFeature feature = new RAWDataFormatFeature();
>> +                if (this.getMep() != null && this.getMep().equals("InOnly")) {
>> +                    //if DataFormat is RAW|MESSAGE, can't read message so can't
>> +                    //determine it's oneway so need get the MEP from URI explicitly
>> +                    feature.setOneway(true);
>> +                }
>>                 feature.addInIntercepters(getInInterceptors());
>>                 feature.addOutInterceptors(getOutInterceptors());
>>                 sfb.getFeatures().add(feature);
>> @@ -1120,6 +1127,18 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
>>         this.username = username;
>>     }
>> 
>> +    public String getMep() {
>> +        return mep;
>> +    }
>> +
>> +    /**
>> +     * The Message Exchange Pattern
>> +     */
>> +    public void setMep(String mep) {
>> +        this.mep = mep;
>> +    }
>> +
>> +
>>     /**
>>      * We need to override the {@link ClientImpl#setParameters} method
>>      * to insert parameters into CXF Message for {@link DataFormat#PAYLOAD} mode.
>> 
>> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
>> ----------------------------------------------------------------------
>> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
>> index ead2489..22f67fc 100644
>> --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
>> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
>> @@ -17,12 +17,16 @@
>> 
>> package org.apache.camel.component.cxf.feature;
>> 
>> +import org.apache.camel.component.cxf.interceptors.OneWayOutgoingChainInterceptor;
>> import org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor;
>> import org.apache.camel.component.cxf.interceptors.RawMessageWSDLGetInterceptor;
>> import org.apache.cxf.Bus;
>> import org.apache.cxf.endpoint.Client;
>> import org.apache.cxf.endpoint.Server;
>> +import org.apache.cxf.interceptor.Interceptor;
>> import org.apache.cxf.interceptor.LoggingOutInterceptor;
>> +import org.apache.cxf.interceptor.OneWayProcessorInterceptor;
>> +import org.apache.cxf.message.Message;
>> import org.apache.cxf.phase.Phase;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> @@ -49,6 +53,8 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature {
>>     // filter the unused in phase interceptor
>>     private static final String[] REMAINING_OUT_PHASES = {Phase.PREPARE_SEND, Phase.USER_STREAM,
>>         Phase.WRITE, Phase.SEND, Phase.PREPARE_SEND_ENDING};
>> +
>> +    private boolean oneway;
>> 
>>     @Override
>>     public void initialize(Client client, Bus bus) {
>> @@ -89,12 +95,32 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature {
>> 
>>         // setup the RawMessageWSDLGetInterceptor
>>         server.getEndpoint().getInInterceptors().add(RawMessageWSDLGetInterceptor.INSTANCE);
>> +        // Oneway with RAW message
>> +        if (isOneway()) {
>> +            Interceptor<? extends Message> toRemove = null;
>> +            for (Interceptor<? extends Message> i : server.getEndpoint().getService().getInInterceptors()) {
>> +                if (i.getClass().getName().equals("org.apache.cxf.interceptor.OutgoingChainInterceptor")) {
>> +                    toRemove = i;
>> +                }
>> +            }
>> +            server.getEndpoint().getService().getInInterceptors().remove(toRemove);
>> +            server.getEndpoint().getInInterceptors().add(new OneWayOutgoingChainInterceptor());
>> +            server.getEndpoint().getInInterceptors().add(new OneWayProcessorInterceptor());
>> +        }
>>     }
>> 
>>     @Override
>>     protected Logger getLogger() {
>>         return LOG;
>>     }
>> +
>> +    public boolean isOneway() {
>> +        return oneway;
>> +    }
>> +
>> +    public void setOneway(boolean oneway) {
>> +        this.oneway = oneway;
>> +    }
>> 
>> 
>> }
>> 
>> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
>> ----------------------------------------------------------------------
>> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
>> new file mode 100644
>> index 0000000..f586847
>> --- /dev/null
>> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
>> @@ -0,0 +1,52 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +
>> +package org.apache.camel.component.cxf.interceptors;
>> +
>> +import java.io.IOException;
>> +import java.io.InputStream;
>> +
>> +import org.apache.cxf.interceptor.OutgoingChainInterceptor;
>> +import org.apache.cxf.message.Message;
>> +import org.apache.cxf.phase.AbstractPhaseInterceptor;
>> +import org.apache.cxf.phase.Phase;
>> +
>> +
>> +public class OneWayOutgoingChainInterceptor extends AbstractPhaseInterceptor<Message> {
>> +
>> +    public OneWayOutgoingChainInterceptor() {
>> +        super(Phase.POST_INVOKE);
>> +        this.addBefore(OutgoingChainInterceptor.class.getName());
>> +    }
>> +
>> +    public void handleMessage(Message message) {
>> +        closeInput(message);
>> +        return;
>> +    }
>> +
>> +    private void closeInput(Message message) {
>> +        InputStream is = message.getContent(InputStream.class);
>> +        if (is != null) {
>> +            try {
>> +                is.close();
>> +                message.removeContent(InputStream.class);
>> +            } catch (IOException ioex) {
>> +                //ignore
>> +            }
>> +        }
>> +    }
>> +}
>> 
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2


Re: camel git commit: [CAMEL-9821]add mep uri param for camel-cxf endpoint

Posted by Claus Ibsen <cl...@gmail.com>.
-1

There is already an exchangePattern option from the DefaultEndpoint.

Can you not use that, instead of inventing a new duplicate option?



On Wed, Apr 6, 2016 at 4:35 AM,  <ff...@apache.org> wrote:
> Repository: camel
> Updated Branches:
>   refs/heads/master 0437f7db6 -> 92becd2a9
>
>
> [CAMEL-9821]add mep uri param for camel-cxf endpoint
>
>
> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92becd2a
> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92becd2a
> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92becd2a
>
> Branch: refs/heads/master
> Commit: 92becd2a94dc0135dee0b92c8caff31eb51dae31
> Parents: 0437f7d
> Author: Freeman Fang <fr...@gmail.com>
> Authored: Wed Apr 6 10:34:45 2016 +0800
> Committer: Freeman Fang <fr...@gmail.com>
> Committed: Wed Apr 6 10:34:45 2016 +0800
>
> ----------------------------------------------------------------------
>  .../apache/camel/component/cxf/CxfConsumer.java |  5 ++
>  .../apache/camel/component/cxf/CxfEndpoint.java | 19 +++++++
>  .../cxf/feature/RAWDataFormatFeature.java       | 26 ++++++++++
>  .../OneWayOutgoingChainInterceptor.java         | 52 ++++++++++++++++++++
>  4 files changed, 102 insertions(+)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
> ----------------------------------------------------------------------
> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
> index fab7267..5b32832 100644
> --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
> @@ -172,7 +172,12 @@ public class CxfConsumer extends DefaultConsumer {
>                      if (boi.getOperationInfo().isOneWay()) {
>                          camelExchange.setPattern(ExchangePattern.InOnly);
>                      }
> +                } else {
> +                    if (cxfEndpoint.getMep() != null && cxfEndpoint.getMep().equals("InOnly")) {
> +                        camelExchange.setPattern(ExchangePattern.InOnly);
> +                    }
>                  }
> +
>
>                  // set data format mode in Camel exchange
>                  camelExchange.setProperty(CxfConstants.DATA_FORMAT_PROPERTY, dataFormat);
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
> ----------------------------------------------------------------------
> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
> index 8dd5442..12b9ce0 100644
> --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfEndpoint.java
> @@ -192,6 +192,8 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
>      private String password;
>      @UriParam(label = "advanced", prefix = "properties.", multiValue = true)
>      private Map<String, Object> properties;
> +    @UriParam(name = "mep")
> +    private String mep;
>
>      public CxfEndpoint() {
>      }
> @@ -309,6 +311,11 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
>                  sfb.setDataBinding(new SourceDataBinding());
>              } else if (getDataFormat().dealias() == DataFormat.RAW) {
>                  RAWDataFormatFeature feature = new RAWDataFormatFeature();
> +                if (this.getMep() != null && this.getMep().equals("InOnly")) {
> +                    //if DataFormat is RAW|MESSAGE, can't read message so can't
> +                    //determine it's oneway so need get the MEP from URI explicitly
> +                    feature.setOneway(true);
> +                }
>                  feature.addInIntercepters(getInInterceptors());
>                  feature.addOutInterceptors(getOutInterceptors());
>                  sfb.getFeatures().add(feature);
> @@ -1120,6 +1127,18 @@ public class CxfEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
>          this.username = username;
>      }
>
> +    public String getMep() {
> +        return mep;
> +    }
> +
> +    /**
> +     * The Message Exchange Pattern
> +     */
> +    public void setMep(String mep) {
> +        this.mep = mep;
> +    }
> +
> +
>      /**
>       * We need to override the {@link ClientImpl#setParameters} method
>       * to insert parameters into CXF Message for {@link DataFormat#PAYLOAD} mode.
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
> ----------------------------------------------------------------------
> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
> index ead2489..22f67fc 100644
> --- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/feature/RAWDataFormatFeature.java
> @@ -17,12 +17,16 @@
>
>  package org.apache.camel.component.cxf.feature;
>
> +import org.apache.camel.component.cxf.interceptors.OneWayOutgoingChainInterceptor;
>  import org.apache.camel.component.cxf.interceptors.RawMessageContentRedirectInterceptor;
>  import org.apache.camel.component.cxf.interceptors.RawMessageWSDLGetInterceptor;
>  import org.apache.cxf.Bus;
>  import org.apache.cxf.endpoint.Client;
>  import org.apache.cxf.endpoint.Server;
> +import org.apache.cxf.interceptor.Interceptor;
>  import org.apache.cxf.interceptor.LoggingOutInterceptor;
> +import org.apache.cxf.interceptor.OneWayProcessorInterceptor;
> +import org.apache.cxf.message.Message;
>  import org.apache.cxf.phase.Phase;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
> @@ -49,6 +53,8 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature {
>      // filter the unused in phase interceptor
>      private static final String[] REMAINING_OUT_PHASES = {Phase.PREPARE_SEND, Phase.USER_STREAM,
>          Phase.WRITE, Phase.SEND, Phase.PREPARE_SEND_ENDING};
> +
> +    private boolean oneway;
>
>      @Override
>      public void initialize(Client client, Bus bus) {
> @@ -89,12 +95,32 @@ public class RAWDataFormatFeature extends AbstractDataFormatFeature {
>
>          // setup the RawMessageWSDLGetInterceptor
>          server.getEndpoint().getInInterceptors().add(RawMessageWSDLGetInterceptor.INSTANCE);
> +        // Oneway with RAW message
> +        if (isOneway()) {
> +            Interceptor<? extends Message> toRemove = null;
> +            for (Interceptor<? extends Message> i : server.getEndpoint().getService().getInInterceptors()) {
> +                if (i.getClass().getName().equals("org.apache.cxf.interceptor.OutgoingChainInterceptor")) {
> +                    toRemove = i;
> +                }
> +            }
> +            server.getEndpoint().getService().getInInterceptors().remove(toRemove);
> +            server.getEndpoint().getInInterceptors().add(new OneWayOutgoingChainInterceptor());
> +            server.getEndpoint().getInInterceptors().add(new OneWayProcessorInterceptor());
> +        }
>      }
>
>      @Override
>      protected Logger getLogger() {
>          return LOG;
>      }
> +
> +    public boolean isOneway() {
> +        return oneway;
> +    }
> +
> +    public void setOneway(boolean oneway) {
> +        this.oneway = oneway;
> +    }
>
>
>  }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/92becd2a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
> ----------------------------------------------------------------------
> diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
> new file mode 100644
> index 0000000..f586847
> --- /dev/null
> +++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/interceptors/OneWayOutgoingChainInterceptor.java
> @@ -0,0 +1,52 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.camel.component.cxf.interceptors;
> +
> +import java.io.IOException;
> +import java.io.InputStream;
> +
> +import org.apache.cxf.interceptor.OutgoingChainInterceptor;
> +import org.apache.cxf.message.Message;
> +import org.apache.cxf.phase.AbstractPhaseInterceptor;
> +import org.apache.cxf.phase.Phase;
> +
> +
> +public class OneWayOutgoingChainInterceptor extends AbstractPhaseInterceptor<Message> {
> +
> +    public OneWayOutgoingChainInterceptor() {
> +        super(Phase.POST_INVOKE);
> +        this.addBefore(OutgoingChainInterceptor.class.getName());
> +    }
> +
> +    public void handleMessage(Message message) {
> +        closeInput(message);
> +        return;
> +    }
> +
> +    private void closeInput(Message message) {
> +        InputStream is = message.getContent(InputStream.class);
> +        if (is != null) {
> +            try {
> +                is.close();
> +                message.removeContent(InputStream.class);
> +            } catch (IOException ioex) {
> +                //ignore
> +            }
> +        }
> +    }
> +}
>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2