You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2011/06/07 14:49:10 UTC
svn commit: r1132984 - in
/servicemix/components/trunk/bindings/servicemix-http/src:
main/java/org/apache/servicemix/http/endpoints/
test/java/org/apache/servicemix/http/
test/java/org/apache/servicemix/http/endpoints/
Author: gertv
Date: Tue Jun 7 12:49:09 2011
New Revision: 1132984
URL: http://svn.apache.org/viewvc?rev=1132984&view=rev
Log:
SMXCOMP-881: Allow configuring HTTP consumer endpoint for handling late responses more quietly
Added:
servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java
servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java
Modified:
servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
Modified: servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=1132984&r1=1132983&r2=1132984&view=diff
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Tue Jun 7 12:49:09 2011
@@ -53,7 +53,6 @@ import org.apache.servicemix.jbi.jaxp.So
import org.mortbay.jetty.RetryRequest;
import org.mortbay.util.ajax.Continuation;
import org.mortbay.util.ajax.ContinuationSupport;
-import org.mortbay.util.ajax.WaitingContinuation;
import static org.apache.servicemix.http.jetty.ContinuationHelper.isNewContinuation;
@@ -79,6 +78,7 @@ public class HttpConsumerEndpoint extend
private Map<String, Object> mutexes = new ConcurrentHashMap<String, Object>();
private Object httpContext;
private boolean started = false;
+ private LateResponseStrategy lateResponseStrategy = LateResponseStrategy.error;
public HttpConsumerEndpoint() {
super();
@@ -205,6 +205,25 @@ public class HttpConsumerEndpoint extend
this.defaultMep = defaultMep;
}
+ public String getLateResponseStrategy() {
+ return lateResponseStrategy.name();
+ }
+
+ /**
+ * Set the strategy to be used for handling a late response from the ESB (i.e. a response that arrives after the HTTP request has timed out).
+ * Defaults to <code>error</code>
+ *
+ * <ul>
+ * <li><code>error</code> will terminate the exchange with an ERROR status and log an exception for the late response</li>
+ * <li><code>warning</code> will end the exchange with a DONE status and log a warning for the late response instead</li>
+ * </ul>
+ *
+ * @param value
+ */
+ public void setLateResponseStrategy(String value) {
+ this.lateResponseStrategy = LateResponseStrategy.valueOf(value);
+ }
+
public void activate() throws Exception {
super.activate();
loadStaticResources();
@@ -347,8 +366,6 @@ public class HttpConsumerEndpoint extend
* Handle the HTTP response based on the information in the message exchange we received
*/
private void handleResponse(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response) throws Exception {
- // At this point, we have received the exchange response,
- // so process it and send back the HTTP response
if (exchange.getStatus() == ExchangeStatus.ERROR) {
Exception e = exchange.getError();
if (e == null) {
@@ -381,11 +398,15 @@ public class HttpConsumerEndpoint extend
* Handle a message exchange that is being received after the corresponding HTTP request has timed out
*/
private void handleLateResponse(MessageExchange exchange) throws Exception {
- throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
-
- // TODO: allow multiple options for handling late response from the ESB
- // - by throwing an exception to make the exchange end in error
- // - by logging a warning (make sure MEP gets handled appropriately here!)
+ // if the exchange is no longer active by now, something else probably went wrong in the meanwhile
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ if (lateResponseStrategy == LateResponseStrategy.error) {
+ fail(exchange, new Exception("HTTP request has timed out for exchange: {} " + exchange.getExchangeId()));
+ } else {
+ logger.warn("HTTP request has timed out for exchange: {}", exchange.getExchangeId());
+ done(exchange);
+ }
+ }
}
/*
@@ -536,4 +557,21 @@ public class HttpConsumerEndpoint extend
((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
}
}
+
+ /**
+ * Determines how the HTTP consumer endpoint should handle a late response from the NMR
+ */
+ protected enum LateResponseStrategy {
+
+ /**
+ * Terminate the exchange with an ERROR status and log an exception for the late response
+ */
+ error,
+
+ /**
+ * End the exchange with a DONE status and log a warning for the late response
+ */
+ warning
+
+ }
}
Added: servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java?rev=1132984&view=auto
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java (added)
+++ servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java Tue Jun 7 12:49:09 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ * By default, the Maven build will find free port numbers and assign them to system properties for our tests to use.
+ *
+ * However, when running the unit tests in an IDE, this mechanism is not always available so this class provides a
+ * helper method to graciously fall back to a random port number for running the tests.
+ */
+public class PortFinder {
+
+ private static final int DEFAULT_PORT = 10101;
+
+ private PortFinder() {
+ // hiding the constructor, only static helper methods in this class
+ }
+
+ /**
+ * Find the port number that's defined in the system properties. If there's no valid port number there,
+ * this method will try to return a random, free port number.
+ *
+ * @param name the system property
+ * @return the port number defined or a random, free port number
+ */
+ public static int find(String name) {
+ String port = System.getProperty(name);
+
+ if (port == null) {
+ return findFreePort();
+ }
+
+ try {
+ return Integer.parseInt(port);
+ } catch (NumberFormatException e) {
+ return findFreePort();
+ }
+ }
+
+ private static int findFreePort() {
+ ServerSocket server = null;
+ try {
+ server = new ServerSocket(0);
+ int port = server.getLocalPort();
+ server.close();
+ return port;
+ } catch (IOException e) {
+ return DEFAULT_PORT;
+ }
+ }
+}
Added: servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java?rev=1132984&view=auto
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java (added)
+++ servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java Tue Jun 7 12:49:09 2011
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http.endpoints;
+
+import junit.framework.TestCase;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.http.HttpComponent;
+import org.apache.servicemix.http.HttpEndpointType;
+import org.apache.servicemix.http.PortFinder;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.helper.MessageUtil;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.*;
+import javax.xml.namespace.QName;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test cases for the {@link HttpConsumerEndpoint#lateResponseStrategy} behaviour
+ */
+public class HttpConsumerLateResponseHandlingTest extends TestCase {
+
+ private static final long TIMEOUT = 500;
+
+ protected JBIContainer container;
+
+ private static final int port1 = PortFinder.find("http.port1");
+
+ protected void setUp() throws Exception {
+ container = new JBIContainer();
+ container.setUseMBeanServer(false);
+ container.setCreateMBeanServer(false);
+ container.setEmbedded(true);
+ container.init();
+ }
+
+ protected void tearDown() throws Exception {
+ if (container != null) {
+ container.shutDown();
+ }
+ }
+
+ public void testInOutWithStrategyError() throws Exception {
+ MessageExchange exchange = doTestInOutWithStrategy(HttpConsumerEndpoint.LateResponseStrategy.error);
+ assertEquals("Exchange should have ended in ERROR", ExchangeStatus.ERROR, exchange.getStatus());
+ assertNotNull(exchange.getError());
+ }
+
+ public void testInOutWithStrategyWarning() throws Exception {
+ MessageExchange exchange = doTestInOutWithStrategy(HttpConsumerEndpoint.LateResponseStrategy.warning);
+ assertEquals("Exchange should have ended normally", ExchangeStatus.DONE, exchange.getStatus());
+ }
+
+ /*
+ * Perform test for strategy and return MessageExchange object being sent/received
+ */
+ private MessageExchange doTestInOutWithStrategy(HttpConsumerEndpoint.LateResponseStrategy strategy) throws JBIException, IOException, InterruptedException {
+ HttpComponent http = new HttpComponent();
+ HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
+ ep.setService(new QName("urn:test", "svc"));
+ ep.setEndpoint("ep");
+ ep.setLateResponseStrategy(strategy.name());
+ ep.setTimeout(TIMEOUT);
+ ep.setTargetService(new QName("urn:test", "echo"));
+ ep.setLocationURI("http://localhost:" + port1 + "/ep1/");
+ http.setEndpoints(new HttpEndpointType[]{ep});
+ container.activateComponent(http, "http");
+
+ final CountDownLatch latch = new CountDownLatch(2);
+ final AtomicReference<MessageExchange> reference = new AtomicReference<MessageExchange>();
+
+ EchoComponent echo = new EchoComponent() {
+ @Override
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ // enable content re-readability now before HTTP request has timed out
+ MessageUtil.enableContentRereadability(exchange.getMessage("in"));
+
+ reference.set(exchange);
+
+ if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+ try {
+ Thread.sleep(2 * TIMEOUT);
+ } catch (InterruptedException e) {
+ // graciously ignore this, the unit test itself will fail because it never hits the timemout
+ }
+ }
+
+ super.onMessageExchange(exchange);
+ latch.countDown();
+ }
+ };
+ echo.setService(new QName("urn:test", "echo"));
+ echo.setEndpoint("endpoint");
+ container.activateComponent(echo, "echo");
+
+ container.start();
+
+ PostMethod post = new PostMethod("http://localhost:" + port1 + "/ep1/");
+ post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
+ new HttpClient().executeMethod(post);
+
+ assertEquals("HTTP request should have timed out", 500, post.getStatusCode());
+
+ post.releaseConnection();
+
+ // let's wait for the MEP to be completely handled
+ latch.await(2 * TIMEOUT, TimeUnit.MILLISECONDS);
+
+ container.deactivateComponent("echo");
+ container.deactivateComponent("http");
+
+ return reference.get();
+ }
+}