You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2015/04/07 22:47:20 UTC
[5/6] cxf git commit: Update Metrics feature to support client side
metrics
Update Metrics feature to support client side metrics
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/e5637ffc
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/e5637ffc
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/e5637ffc
Branch: refs/heads/master
Commit: e5637ffcc3a56923982c4254cffe54cc62130ee9
Parents: 72ba5f3
Author: Daniel Kulp <dk...@apache.org>
Authored: Tue Apr 7 16:28:11 2015 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Tue Apr 7 16:30:26 2015 -0400
----------------------------------------------------------------------
.../org/apache/cxf/metrics/MetricsFeature.java | 43 +++++++++++++
.../org/apache/cxf/metrics/MetricsProvider.java | 5 +-
.../codahale/CodahaleMetricsProvider.java | 19 +++---
.../AbstractMetricsInterceptor.java | 6 +-
.../interceptors/CountingOutInterceptor.java | 15 ++---
.../MetricsMessageClientOutInterceptor.java | 64 ++++++++++++++++++++
.../MetricsMessageInInterceptor.java | 9 +--
.../MetricsMessageInPostInvokeInterceptor.java | 37 +++++++++++
.../MetricsMessageInPreInvokeInterceptor.java | 14 +++--
.../MetricsMessageOutInterceptor.java | 7 ++-
10 files changed, 184 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsFeature.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsFeature.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsFeature.java
index 79e9fa3..df31225 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsFeature.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsFeature.java
@@ -21,11 +21,16 @@ package org.apache.cxf.metrics;
import org.apache.cxf.Bus;
import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.Server;
import org.apache.cxf.feature.AbstractFeature;
import org.apache.cxf.interceptor.InterceptorProvider;
import org.apache.cxf.metrics.interceptors.CountingOutInterceptor;
+import org.apache.cxf.metrics.interceptors.MetricsMessageClientOutInterceptor;
import org.apache.cxf.metrics.interceptors.MetricsMessageInInterceptor;
import org.apache.cxf.metrics.interceptors.MetricsMessageInOneWayInterceptor;
+import org.apache.cxf.metrics.interceptors.MetricsMessageInPostInvokeInterceptor;
import org.apache.cxf.metrics.interceptors.MetricsMessageInPreInvokeInterceptor;
import org.apache.cxf.metrics.interceptors.MetricsMessageOutInterceptor;
@@ -44,15 +49,53 @@ public class MetricsFeature extends AbstractFeature {
}
@Override
+ public void initialize(Server server, Bus bus) {
+ //can optimize for server case and just put interceptors it needs
+ Endpoint provider = server.getEndpoint();
+ MetricsMessageOutInterceptor out = new MetricsMessageOutInterceptor(providers);
+ CountingOutInterceptor countingOut = new CountingOutInterceptor();
+
+ provider.getInInterceptors().add(new MetricsMessageInInterceptor(providers));
+ provider.getInInterceptors().add(new MetricsMessageInOneWayInterceptor(providers));
+ provider.getInInterceptors().add(new MetricsMessageInPreInvokeInterceptor(providers));
+
+ provider.getOutInterceptors().add(countingOut);
+ provider.getOutInterceptors().add(out);
+ provider.getOutFaultInterceptors().add(countingOut);
+ provider.getOutFaultInterceptors().add(out);
+ }
+
+ @Override
+ public void initialize(Client client, Bus bus) {
+ //can optimize for client case and just put interceptors it needs
+ MetricsMessageOutInterceptor out = new MetricsMessageOutInterceptor(providers);
+ CountingOutInterceptor countingOut = new CountingOutInterceptor();
+
+ client.getInInterceptors().add(new MetricsMessageInInterceptor(providers));
+ client.getInInterceptors().add(new MetricsMessageInPostInvokeInterceptor(providers));
+ client.getInFaultInterceptors().add(new MetricsMessageInPostInvokeInterceptor(providers));
+ client.getOutInterceptors().add(countingOut);
+ client.getOutInterceptors().add(out);
+ client.getOutInterceptors().add(new MetricsMessageClientOutInterceptor(providers));
+ }
+
+
+ @Override
protected void initializeProvider(InterceptorProvider provider, Bus bus) {
+ //if feature is added to the bus, we need to add all the interceptors
MetricsMessageOutInterceptor out = new MetricsMessageOutInterceptor(providers);
CountingOutInterceptor countingOut = new CountingOutInterceptor();
provider.getInInterceptors().add(new MetricsMessageInInterceptor(providers));
provider.getInInterceptors().add(new MetricsMessageInOneWayInterceptor(providers));
provider.getInInterceptors().add(new MetricsMessageInPreInvokeInterceptor(providers));
+ provider.getInInterceptors().add(new MetricsMessageInPostInvokeInterceptor(providers));
+ provider.getInFaultInterceptors().add(new MetricsMessageInPreInvokeInterceptor(providers));
+ provider.getInFaultInterceptors().add(new MetricsMessageInPostInvokeInterceptor(providers));
+
provider.getOutInterceptors().add(countingOut);
provider.getOutInterceptors().add(out);
+ provider.getOutInterceptors().add(new MetricsMessageClientOutInterceptor(providers));
provider.getOutFaultInterceptors().add(countingOut);
provider.getOutFaultInterceptors().add(out);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsProvider.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsProvider.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsProvider.java
index 50d0fbd..e2bb994 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsProvider.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/MetricsProvider.java
@@ -26,9 +26,10 @@ import org.apache.cxf.service.model.BindingOperationInfo;
*
*/
public interface MetricsProvider {
+ String CLIENT_ID = "MetricsProvider.CLIENT_ID";
- MetricsContext createEndpointContext(Endpoint endpoint, boolean asClient);
+ MetricsContext createEndpointContext(Endpoint endpoint, boolean asClient, String cid);
- MetricsContext createOperationContext(Endpoint endpoint, BindingOperationInfo boi, boolean asClient);
+ MetricsContext createOperationContext(Endpoint endpoint, BindingOperationInfo boi, boolean asClient, String cid);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/codahale/CodahaleMetricsProvider.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/codahale/CodahaleMetricsProvider.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/codahale/CodahaleMetricsProvider.java
index b5388b7..6e520c9 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/codahale/CodahaleMetricsProvider.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/codahale/CodahaleMetricsProvider.java
@@ -40,7 +40,7 @@ import org.apache.cxf.service.model.BindingOperationInfo;
*
*/
@NoJSR250Annotations
-public class CodahaleMetricsProvider implements MetricsProvider {
+public class CodahaleMetricsProvider implements MetricsProvider {
private static final String QUESTION_MARK = "?";
private static final String ESCAPED_QUESTION_MARK = "\\?";
@@ -88,7 +88,7 @@ public class CodahaleMetricsProvider implements MetricsProvider {
return value;
}
- StringBuilder getBaseServiceName(Endpoint endpoint, boolean asClient) {
+ StringBuilder getBaseServiceName(Endpoint endpoint, boolean isClient, String clientId) {
StringBuilder buffer = new StringBuilder();
if (endpoint.get("org.apache.cxf.management.service.counter.name") != null) {
buffer.append((String)endpoint.get("org.apache.cxf.management.service.counter.name"));
@@ -101,14 +101,16 @@ public class CodahaleMetricsProvider implements MetricsProvider {
buffer.append(ManagementConstants.DEFAULT_DOMAIN_NAME + ":");
buffer.append(ManagementConstants.BUS_ID_PROP + "=" + bus.getId() + ",");
buffer.append(ManagementConstants.TYPE_PROP).append("=Metrics");
- if (asClient) {
+ if (isClient) {
buffer.append(".Client,");
} else {
buffer.append(".Server,");
}
buffer.append(ManagementConstants.SERVICE_NAME_PROP + "=" + serviceName + ",");
-
buffer.append(ManagementConstants.PORT_NAME_PROP + "=" + portName + ",");
+ if (clientId != null) {
+ buffer.append("Client=" + clientId + ",");
+ }
}
return buffer;
}
@@ -116,16 +118,17 @@ public class CodahaleMetricsProvider implements MetricsProvider {
/** {@inheritDoc}*/
@Override
- public MetricsContext createEndpointContext(final Endpoint endpoint, boolean asClient) {
- StringBuilder buffer = getBaseServiceName(endpoint, asClient);
+ public MetricsContext createEndpointContext(final Endpoint endpoint, boolean isClient, String clientId) {
+ StringBuilder buffer = getBaseServiceName(endpoint, isClient, clientId);
final String baseName = buffer.toString();
return new CodahaleMetricsContext(baseName, registry);
}
/** {@inheritDoc}*/
@Override
- public MetricsContext createOperationContext(Endpoint endpoint, BindingOperationInfo boi, boolean asClient) {
- StringBuilder buffer = getBaseServiceName(endpoint, asClient);
+ public MetricsContext createOperationContext(Endpoint endpoint, BindingOperationInfo boi,
+ boolean asClient, String clientId) {
+ StringBuilder buffer = getBaseServiceName(endpoint, asClient, clientId);
buffer.append("Operation=").append(boi.getName().getLocalPart()).append(',');
return new CodahaleMetricsContext(buffer.toString(), registry);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/AbstractMetricsInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/AbstractMetricsInterceptor.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/AbstractMetricsInterceptor.java
index dd9cc62..6955866 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/AbstractMetricsInterceptor.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/AbstractMetricsInterceptor.java
@@ -93,7 +93,8 @@ public abstract class AbstractMetricsInterceptor extends AbstractPhaseIntercepto
if (o == null) {
List<MetricsContext> contexts = new ArrayList<MetricsContext>();
for (MetricsProvider p : getMetricProviders(m.getExchange().getBus())) {
- MetricsContext c = p.createEndpointContext(ep, MessageUtils.isRequestor(m));
+ MetricsContext c = p.createEndpointContext(ep, MessageUtils.isRequestor(m),
+ (String)m.getContextualProperty(MetricsProvider.CLIENT_ID));
if (c != null) {
contexts.add(c);
}
@@ -137,7 +138,8 @@ public abstract class AbstractMetricsInterceptor extends AbstractPhaseIntercepto
List<MetricsContext> contexts = new ArrayList<MetricsContext>();
for (MetricsProvider p : getMetricProviders(message.getExchange().getBus())) {
MetricsContext c = p.createOperationContext(message.getExchange().getEndpoint(),
- boi, MessageUtils.isRequestor(message));
+ boi, MessageUtils.isRequestor(message),
+ (String)message.getContextualProperty(MetricsProvider.CLIENT_ID));
if (c != null) {
contexts.add(c);
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/CountingOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/CountingOutInterceptor.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/CountingOutInterceptor.java
index 4d20a9d..ce103ca 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/CountingOutInterceptor.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/CountingOutInterceptor.java
@@ -32,16 +32,11 @@ public class CountingOutInterceptor extends AbstractPhaseInterceptor<Message> {
addBefore(AttachmentOutInterceptor.class.getName());
}
public void handleMessage(Message message) throws Fault {
- if (isRequestor(message)) {
- //
- } else {
- OutputStream out = message.getContent(OutputStream.class);
- if (out != null) {
- CountingOutputStream newOut = new CountingOutputStream(out);
- message.setContent(OutputStream.class, newOut);
- message.getExchange().put(CountingOutputStream.class, newOut);
- }
-
+ OutputStream out = message.getContent(OutputStream.class);
+ if (out != null) {
+ CountingOutputStream newOut = new CountingOutputStream(out);
+ message.setContent(OutputStream.class, newOut);
+ message.getExchange().put(CountingOutputStream.class, newOut);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageClientOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageClientOutInterceptor.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageClientOutInterceptor.java
new file mode 100644
index 0000000..b5689be
--- /dev/null
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageClientOutInterceptor.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.metrics.interceptors;
+
+import java.io.InputStream;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.FaultMode;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.metrics.ExchangeMetrics;
+import org.apache.cxf.metrics.MetricsProvider;
+import org.apache.cxf.phase.Phase;
+
+public class MetricsMessageClientOutInterceptor extends AbstractMetricsInterceptor {
+ public MetricsMessageClientOutInterceptor(MetricsProvider p[]) {
+ super(Phase.SETUP, p);
+ addBefore("*");
+ }
+ public void handleMessage(Message message) throws Fault {
+ if (isRequestor(message)) {
+ ExchangeMetrics ctx = getExchangeMetrics(message, true);
+ InputStream in = message.getContent(InputStream.class);
+ if (in != null) {
+ CountingInputStream newIn = new CountingInputStream(in);
+ message.setContent(InputStream.class, newIn);
+ message.getExchange().put(CountingInputStream.class, newIn);
+ }
+ if (message.getExchange().getBindingOperationInfo() != null) {
+ //we now know the operation, start metrics for it
+ addOperationMetrics(ctx, message, message.getExchange().getBindingOperationInfo());
+ }
+ ctx.start();
+ }
+ }
+ public void handleFault(Message message) {
+ if (isRequestor(message)) {
+ Exception ex = message.getContent(Exception.class);
+ if (ex != null) {
+ FaultMode fm = message.getExchange().get(FaultMode.class);
+ message.getExchange().put(FaultMode.class, FaultMode.RUNTIME_FAULT);
+ stop(message);
+ message.getExchange().put(FaultMode.class, fm);
+ } else {
+ stop(message);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInInterceptor.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInInterceptor.java
index 6afc5c8..4f1ef2d 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInInterceptor.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInInterceptor.java
@@ -33,9 +33,7 @@ public class MetricsMessageInInterceptor extends AbstractMetricsInterceptor {
addBefore(AttachmentInInterceptor.class.getName());
}
public void handleMessage(Message message) throws Fault {
- if (isRequestor(message)) {
- //
- } else {
+ if (!isRequestor(message)) {
ExchangeMetrics ctx = getExchangeMetrics(message, true);
InputStream in = message.getContent(InputStream.class);
if (in != null) {
@@ -47,7 +45,10 @@ public class MetricsMessageInInterceptor extends AbstractMetricsInterceptor {
}
}
public void handleFault(Message message) {
- if (message.getExchange().isOneWay()) {
+ if (isRequestor(message)) {
+ //fault on the incoming chains for the client. It will be thrown back to client so stop
+ stop(message);
+ } else if (message.getExchange().isOneWay()) {
stop(message);
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPostInvokeInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPostInvokeInterceptor.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPostInvokeInterceptor.java
new file mode 100644
index 0000000..f1b5027
--- /dev/null
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPostInvokeInterceptor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.metrics.interceptors;
+
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.metrics.MetricsProvider;
+import org.apache.cxf.phase.Phase;
+
+public class MetricsMessageInPostInvokeInterceptor extends AbstractMetricsInterceptor {
+
+ public MetricsMessageInPostInvokeInterceptor(MetricsProvider p[]) {
+ super(Phase.POST_INVOKE, p);
+ }
+
+ public void handleMessage(Message message) throws Fault {
+ if (isRequestor(message)) {
+ stop(message);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPreInvokeInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPreInvokeInterceptor.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPreInvokeInterceptor.java
index 926a492..697d6cb 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPreInvokeInterceptor.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageInPreInvokeInterceptor.java
@@ -32,12 +32,14 @@ public class MetricsMessageInPreInvokeInterceptor extends AbstractMetricsInterce
}
public void handleMessage(Message message) throws Fault {
- Exchange ex = message.getExchange();
- if (ex.getBindingOperationInfo() != null) {
- //we now know the operation, start metrics for it
- ExchangeMetrics ctx = getExchangeMetrics(message, false);
- if (ctx != null) {
- addOperationMetrics(ctx, message, ex.getBindingOperationInfo());
+ if (!isRequestor(message)) {
+ Exchange ex = message.getExchange();
+ if (ex.getBindingOperationInfo() != null) {
+ //we now know the operation, start metrics for it
+ ExchangeMetrics ctx = getExchangeMetrics(message, false);
+ if (ctx != null) {
+ addOperationMetrics(ctx, message, ex.getBindingOperationInfo());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cxf/blob/e5637ffc/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageOutInterceptor.java
----------------------------------------------------------------------
diff --git a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageOutInterceptor.java b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageOutInterceptor.java
index 29186ba..78dd8de 100644
--- a/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageOutInterceptor.java
+++ b/rt/features/metrics/src/main/java/org/apache/cxf/metrics/interceptors/MetricsMessageOutInterceptor.java
@@ -30,9 +30,10 @@ public class MetricsMessageOutInterceptor extends AbstractMetricsInterceptor {
addBefore(MessageSenderInterceptor.MessageSenderEndingInterceptor.class.getName());
}
public void handleMessage(Message message) throws Fault {
- if (isRequestor(message)) {
- //
- } else {
+ if (!isRequestor(message)) {
+ stop(message);
+ } else if (message.getExchange().isOneWay()) {
+ //one way on the client, it's sent, now stop
stop(message);
}
}