You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2011/06/07 14:49:10 UTC

svn commit: r1132984 - in /servicemix/components/trunk/bindings/servicemix-http/src: main/java/org/apache/servicemix/http/endpoints/ test/java/org/apache/servicemix/http/ test/java/org/apache/servicemix/http/endpoints/

Author: gertv
Date: Tue Jun  7 12:49:09 2011
New Revision: 1132984

URL: http://svn.apache.org/viewvc?rev=1132984&view=rev
Log:
SMXCOMP-881: Allow configuring HTTP consumer endpoint for handling late responses more quietly

Added:
    servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java
    servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java
Modified:
    servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

Modified: servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java?rev=1132984&r1=1132983&r2=1132984&view=diff
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java (original)
+++ servicemix/components/trunk/bindings/servicemix-http/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java Tue Jun  7 12:49:09 2011
@@ -53,7 +53,6 @@ import org.apache.servicemix.jbi.jaxp.So
 import org.mortbay.jetty.RetryRequest;
 import org.mortbay.util.ajax.Continuation;
 import org.mortbay.util.ajax.ContinuationSupport;
-import org.mortbay.util.ajax.WaitingContinuation;
 
 import static org.apache.servicemix.http.jetty.ContinuationHelper.isNewContinuation;
 
@@ -79,6 +78,7 @@ public class HttpConsumerEndpoint extend
     private Map<String, Object> mutexes = new ConcurrentHashMap<String, Object>();
     private Object httpContext;
     private boolean started = false;
+    private LateResponseStrategy lateResponseStrategy = LateResponseStrategy.error;
 
     public HttpConsumerEndpoint() {
         super();
@@ -205,6 +205,25 @@ public class HttpConsumerEndpoint extend
         this.defaultMep = defaultMep;
     }
 
+    public String getLateResponseStrategy() {
+        return lateResponseStrategy.name();
+    }
+
+    /**
+     * Set the strategy to be used for handling a late response from the ESB (i.e. a response that arrives after the HTTP request has timed out).
+     * Defaults to <code>error</code>
+     *
+     * <ul>
+     *     <li><code>error</code> will terminate the exchange with an ERROR status and log an exception for the late response</li>
+     *     <li><code>warning</code> will end the exchange with a DONE status and log a warning for the late response instead</li>
+     * </ul>
+     *
+     * @param value
+     */
+    public void setLateResponseStrategy(String value) {
+        this.lateResponseStrategy = LateResponseStrategy.valueOf(value);
+    }
+
     public void activate() throws Exception {
         super.activate();
         loadStaticResources();
@@ -347,8 +366,6 @@ public class HttpConsumerEndpoint extend
      * Handle the HTTP response based on the information in the message exchange we received
      */
     private void handleResponse(MessageExchange exchange, HttpServletRequest request, HttpServletResponse response) throws Exception {
-        // At this point, we have received the exchange response,
-        // so process it and send back the HTTP response
         if (exchange.getStatus() == ExchangeStatus.ERROR) {
             Exception e = exchange.getError();
             if (e == null) {
@@ -381,11 +398,15 @@ public class HttpConsumerEndpoint extend
      * Handle a message exchange that is being received after the corresponding HTTP request has timed out
      */
     private void handleLateResponse(MessageExchange exchange) throws Exception {
-        throw new Exception("HTTP request has timed out for exchange: " + exchange.getExchangeId());
-
-        // TODO: allow multiple options for handling late response from the ESB
-        // - by throwing an exception to make the exchange end in error
-        // - by logging a warning (make sure MEP gets handled appropriately here!)
+        // if the exchange is no longer active by now, something else probably went wrong in the meanwhile
+        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+            if (lateResponseStrategy == LateResponseStrategy.error) {
+                fail(exchange, new Exception("HTTP request has timed out for exchange: {} " + exchange.getExchangeId()));
+            } else {
+                logger.warn("HTTP request has timed out for exchange: {}", exchange.getExchangeId());
+                done(exchange);
+            }
+        }
     }
 
     /*
@@ -536,4 +557,21 @@ public class HttpConsumerEndpoint extend
             ((DefaultHttpConsumerMarshaler) marshaler).setDefaultMep(getDefaultMep());
         }
     }
+
+    /**
+     * Determines how the HTTP consumer endpoint should handle a late response from the NMR
+     */
+    protected enum LateResponseStrategy {
+
+        /**
+         * Terminate the exchange with an ERROR status and log an exception for the late response
+         */
+        error,
+
+        /**
+         * End the exchange with a DONE status and log a warning for the late response
+         */
+        warning
+
+    }
 }

Added: servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java?rev=1132984&view=auto
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java (added)
+++ servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/PortFinder.java Tue Jun  7 12:49:09 2011
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+/**
+ * By default, the Maven build will find free port numbers and assign them to system properties for our tests to use.
+ *
+ * However, when running the unit tests in an IDE, this mechanism is not always available so this class provides a
+ * helper method to graciously fall back to a random port number for running the tests.
+ */
+public class PortFinder {
+
+    private static final int DEFAULT_PORT = 10101;
+
+    private PortFinder() {
+        // hiding the constructor, only static helper methods in this class
+    }
+
+    /**
+     * Find the port number that's defined in the system properties.  If there's no valid port number there,
+     * this method will try to return a random, free port number.
+     *
+     * @param name the system property
+     * @return the port number defined or a random, free port number
+     */
+    public static int find(String name) {
+        String port = System.getProperty(name);
+
+        if (port == null) {
+            return findFreePort();
+        }
+
+        try {
+            return Integer.parseInt(port);
+        } catch (NumberFormatException e) {
+            return findFreePort();
+        }
+    }
+
+    private static int findFreePort() {
+        ServerSocket server = null;
+        try {
+            server = new ServerSocket(0);
+            int port = server.getLocalPort();
+            server.close();
+            return port;
+        } catch (IOException e) {
+            return DEFAULT_PORT;
+        }
+    }
+}

Added: servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java
URL: http://svn.apache.org/viewvc/servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java?rev=1132984&view=auto
==============================================================================
--- servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java (added)
+++ servicemix/components/trunk/bindings/servicemix-http/src/test/java/org/apache/servicemix/http/endpoints/HttpConsumerLateResponseHandlingTest.java Tue Jun  7 12:49:09 2011
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.http.endpoints;
+
+import junit.framework.TestCase;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.servicemix.components.util.EchoComponent;
+import org.apache.servicemix.http.HttpComponent;
+import org.apache.servicemix.http.HttpEndpointType;
+import org.apache.servicemix.http.PortFinder;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.helper.MessageUtil;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.*;
+import javax.xml.namespace.QName;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test cases for the {@link HttpConsumerEndpoint#lateResponseStrategy} behaviour
+ */
+public class HttpConsumerLateResponseHandlingTest extends TestCase {
+
+    private static final long TIMEOUT = 500;
+
+    protected JBIContainer container;
+
+    private static final int port1 = PortFinder.find("http.port1");
+
+    protected void setUp() throws Exception {
+        container = new JBIContainer();
+        container.setUseMBeanServer(false);
+        container.setCreateMBeanServer(false);
+        container.setEmbedded(true);
+        container.init();
+    }
+
+    protected void tearDown() throws Exception {
+        if (container != null) {
+            container.shutDown();
+        }
+    }
+
+    public void testInOutWithStrategyError() throws Exception {
+        MessageExchange exchange = doTestInOutWithStrategy(HttpConsumerEndpoint.LateResponseStrategy.error);
+        assertEquals("Exchange should have ended in ERROR", ExchangeStatus.ERROR, exchange.getStatus());
+        assertNotNull(exchange.getError());
+    }
+
+    public void testInOutWithStrategyWarning() throws Exception {
+        MessageExchange exchange = doTestInOutWithStrategy(HttpConsumerEndpoint.LateResponseStrategy.warning);
+        assertEquals("Exchange should have ended normally", ExchangeStatus.DONE, exchange.getStatus());
+    }
+
+    /*
+     * Perform test for strategy and return MessageExchange object being sent/received
+     */
+    private MessageExchange doTestInOutWithStrategy(HttpConsumerEndpoint.LateResponseStrategy strategy) throws JBIException, IOException, InterruptedException {
+        HttpComponent http = new HttpComponent();
+        HttpConsumerEndpoint ep = new HttpConsumerEndpoint();
+        ep.setService(new QName("urn:test", "svc"));
+        ep.setEndpoint("ep");
+        ep.setLateResponseStrategy(strategy.name());
+        ep.setTimeout(TIMEOUT);
+        ep.setTargetService(new QName("urn:test", "echo"));
+        ep.setLocationURI("http://localhost:" + port1 + "/ep1/");
+        http.setEndpoints(new HttpEndpointType[]{ep});
+        container.activateComponent(http, "http");
+
+        final CountDownLatch latch = new CountDownLatch(2);
+        final AtomicReference<MessageExchange> reference = new AtomicReference<MessageExchange>();
+
+        EchoComponent echo = new EchoComponent() {
+            @Override
+            public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+                // enable content re-readability now before HTTP request has timed out
+                MessageUtil.enableContentRereadability(exchange.getMessage("in"));
+
+                reference.set(exchange);
+
+                if (ExchangeStatus.ACTIVE.equals(exchange.getStatus())) {
+                    try {
+                        Thread.sleep(2 * TIMEOUT);
+                    } catch (InterruptedException e) {
+                        // graciously ignore this, the unit test itself will fail because it never hits the timemout
+                    }
+                }
+
+                super.onMessageExchange(exchange);
+                latch.countDown();
+            }
+        };
+        echo.setService(new QName("urn:test", "echo"));
+        echo.setEndpoint("endpoint");
+        container.activateComponent(echo, "echo");
+
+        container.start();
+
+        PostMethod post = new PostMethod("http://localhost:" + port1 + "/ep1/");
+        post.setRequestEntity(new StringRequestEntity("<hello>world</hello>"));
+        new HttpClient().executeMethod(post);
+
+        assertEquals("HTTP request should have timed out", 500, post.getStatusCode());
+
+        post.releaseConnection();
+
+        // let's wait for the MEP to be completely handled
+        latch.await(2 * TIMEOUT, TimeUnit.MILLISECONDS);
+
+        container.deactivateComponent("echo");
+        container.deactivateComponent("http");
+
+        return reference.get();
+    }
+}