You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2008/04/24 19:09:36 UTC
svn commit: r651317 - in
/servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src:
main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java
test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
Author: gnodet
Date: Thu Apr 24 10:09:20 2008
New Revision: 651317
URL: http://svn.apache.org/viewvc?rev=651317&view=rev
Log:
SM-454: Request / response correlator to bridge InOut with InOnly
Added:
servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java
servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
Added: servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java?rev=651317&view=auto
==============================================================================
--- servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java (added)
+++ servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/AsyncBridge.java Thu Apr 24 10:09:20 2008
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import java.util.Date;
+import java.util.concurrent.TimeoutException;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.InOut;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.expression.Expression;
+import org.apache.servicemix.expression.PropertyExpression;
+import org.apache.servicemix.jbi.util.MessageUtil;
+import org.apache.servicemix.timers.Timer;
+import org.apache.servicemix.timers.TimerListener;
+
+/**
+ *
+ * @author gnodet
+ * @org.apache.xbean.XBean element="async-bridge"
+ */
+public class AsyncBridge extends EIPEndpoint {
+
+ public static final String CORRID = "org.apache.servicemix.eip.asyncbridge.corrid";
+
+ private static final Log LOG = LogFactory.getLog(AsyncBridge.class);
+
+ private Expression requestCorrId = new Expression() {
+ public Object evaluate(MessageExchange exchange, NormalizedMessage message) throws MessagingException {
+ return exchange.getExchangeId();
+ }
+ };
+ private String responseCorrIdProperty = CORRID;
+ private Expression responseCorrId;
+ private long timeout;
+ private ExchangeTarget target;
+ private boolean useRobustInOnly;
+
+ /**
+ * @return the timeout
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * @param timeout the timeout to set
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * @return the target
+ */
+ public ExchangeTarget getTarget() {
+ return target;
+ }
+
+ /**
+ * @param target the target to set
+ */
+ public void setTarget(ExchangeTarget target) {
+ this.target = target;
+ }
+
+ /**
+ * @return the requestCorrId
+ */
+ public Expression getRequestCorrId() {
+ return requestCorrId;
+ }
+
+ /**
+ * @param requestCorrId the requestCorrId to set
+ */
+ public void setRequestCorrId(Expression requestCorrId) {
+ this.requestCorrId = requestCorrId;
+ }
+
+ /**
+ * @return the responseCorrIdProperty
+ */
+ public String getResponseCorrIdProperty() {
+ return responseCorrIdProperty;
+ }
+
+ /**
+ * @param responseCorrIdProperty the responseCorrIdProperty to set
+ */
+ public void setResponseCorrIdProperty(String responseCorrIdProperty) {
+ this.responseCorrIdProperty = responseCorrIdProperty;
+ }
+
+ /**
+ * @return the responseCorrId
+ */
+ public Expression getResponseCorrId() {
+ return responseCorrId;
+ }
+
+ /**
+ * @param responseCorrId the responseCorrId to set
+ */
+ public void setResponseCorrId(Expression responseCorrId) {
+ this.responseCorrId = responseCorrId;
+ }
+
+ /**
+ * @return the useRobustInOnly
+ */
+ public boolean isUseRobustInOnly() {
+ return useRobustInOnly;
+ }
+
+ /**
+ * @param useRobustInOnly the useRobustInOnly to set
+ */
+ public void setUseRobustInOnly(boolean useRobustInOnly) {
+ this.useRobustInOnly = useRobustInOnly;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#start()
+ */
+ public void start() throws Exception {
+ super.start();
+ if (responseCorrId == null) {
+ responseCorrId = new PropertyExpression(responseCorrIdProperty);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processSync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processSync(MessageExchange exchange) throws Exception {
+ throw new IllegalStateException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.eip.EIPEndpoint#processAsync(javax.jbi.messaging.MessageExchange)
+ */
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ throw new IllegalStateException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+ */
+ public void process(MessageExchange exchange) throws Exception {
+ // Handle an exchange as a PROVIDER
+ if (exchange.getRole() == MessageExchange.Role.PROVIDER) {
+ // receive the InOut request
+ // => send the In to the target
+ if (exchange instanceof InOut && exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ final String correlationId = (String) requestCorrId.evaluate(exchange, exchange.getMessage("in"));
+ if (correlationId == null || correlationId.length() == 0) {
+ throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
+ }
+ store.store(correlationId, exchange);
+ MessageExchange tme = useRobustInOnly ? getExchangeFactory().createRobustInOnlyExchange()
+ : getExchangeFactory().createInOnlyExchange();
+ target.configureTarget(tme, getContext());
+ MessageUtil.transferInToIn(exchange, tme);
+ tme.setProperty(responseCorrIdProperty, correlationId);
+ tme.getMessage("in").setProperty(responseCorrIdProperty, correlationId);
+ sendSync(tme);
+ // an error
+ if (tme.getStatus() == ExchangeStatus.ERROR) {
+ store.load(correlationId);
+ fail(exchange, tme.getError());
+ return;
+ // a fault ?
+ } else if (tme.getStatus() == ExchangeStatus.ACTIVE) {
+ store.load(correlationId);
+ MessageUtil.transferFaultToFault(tme, exchange);
+ send(tme);
+ done(tme);
+ return;
+ // request sent
+ } else {
+ Date exchangeTimeout = getTimeout(exchange);
+ if (exchangeTimeout != null) {
+ getTimerManager().schedule(new TimerListener() {
+ public void timerExpired(Timer timer) {
+ AsyncBridge.this.onTimeout(correlationId);
+ }
+ }, exchangeTimeout);
+ }
+ }
+ // receive the done / error for the InOut request
+ } else if (exchange instanceof InOut && exchange.getStatus() != ExchangeStatus.ACTIVE) {
+ // ignore these exchanges
+ // Receive the response
+ } else if (exchange instanceof InOnly || exchange instanceof RobustInOnly) {
+ final String correlationId = (String) responseCorrId.evaluate(exchange, exchange.getMessage("in"));
+ if (correlationId == null || correlationId.length() == 0) {
+ throw new IllegalArgumentException("Could not retrieve correlation id for incoming exchange");
+ }
+ MessageExchange request = (MessageExchange) store.load(correlationId);
+ // The request is found and has not timed out
+ if (request != null) {
+ MessageUtil.transferInToOut(exchange, request);
+ sendSync(request);
+ }
+ done(exchange);
+ } else {
+ throw new IllegalStateException();
+ }
+ // Handle an exchange as a CONSUMER
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
+ protected void onTimeout(String correlationId) {
+ try {
+ MessageExchange request = (MessageExchange) store.load(correlationId);
+ if (request != null) {
+ fail(request, new TimeoutException());
+ }
+ } catch (Exception e) {
+ LOG.debug("Exception caught when handling timeout", e);
+ }
+ }
+
+ protected Date getTimeout(MessageExchange exchange) {
+ if (timeout > 0) {
+ return new Date(System.currentTimeMillis() + timeout);
+ }
+ return null;
+ }
+
+}
Added: servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java?rev=651317&view=auto
==============================================================================
--- servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java (added)
+++ servicemix/smx3/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/AsyncBridgeTest.java Thu Apr 24 10:09:20 2008
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import java.util.concurrent.TimeoutException;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOut;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.components.util.TraceComponent;
+import org.apache.servicemix.eip.patterns.AsyncBridge;
+import org.apache.servicemix.eip.patterns.WireTap;
+
+public class AsyncBridgeTest extends AbstractEIPTest {
+
+ protected AsyncBridge asyncBridge;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ asyncBridge = new AsyncBridge();
+ asyncBridge.setTarget(createServiceExchangeTarget(new QName("target")));
+ asyncBridge.setTimeout(2000);
+ configurePattern(asyncBridge);
+ activateComponent(asyncBridge, "asyncBridge");
+ }
+
+ protected void configureContainer() throws Exception {
+ }
+
+ public void testInOut() throws Exception {
+ WireTap wireTap = new WireTap();
+ wireTap.setCopyProperties(true);
+ wireTap.setTarget(createServiceExchangeTarget(new QName("asyncBridge")));
+ activateComponent(wireTap, "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ assertNotNull(me.getOutMessage());
+ client.done(me);
+
+ Thread.sleep(100);
+ }
+
+ public void testInOutWithTimeOut() throws Exception {
+ activateComponent(new TraceComponent(), "target");
+
+ InOut me = client.createInOutExchange();
+ me.setService(new QName("asyncBridge"));
+ me.getInMessage().setContent(createSource("<hello/>"));
+ client.sendSync(me);
+ assertEquals(ExchangeStatus.ERROR, me.getStatus());
+ assertTrue(me.getError() instanceof TimeoutException);
+
+ Thread.sleep(100);
+ }
+
+
+}