You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/04/06 18:21:15 UTC

svn commit: r1310446 [2/4] - in /cxf/trunk: rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/wsdl/extensions/ rt/core/src/main/java/org/apache/cxf/bus/extension/ rt/core/src/main/java/org/apache/cxf/bus/osgi/ rt/core/src/main/java/org/apache/...

Modified: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java (original)
+++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java Fri Apr  6 16:21:14 2012
@@ -1,139 +1,139 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.common.gzip;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.cxf.helpers.HttpHeaderHelper;
-import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.interceptor.InterceptorChain;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.message.ExchangeImpl;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.message.MessageImpl;
-import org.easymock.EasyMock;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.FORCE;
-import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.YES;
-
-/**
- * Test for the parsing of Accept-Encoding by the GZIPOutInterceptor. For
- * Accept-Encoding values that enable gzip we expect an extra interceptor to be
- * added to the out message, and the {@link GZIPOutInterceptor#USE_GZIP_KEY} to
- * be set correctly. For Accept-Encoding values that do not enable gzip the
- * interceptor should not be added.
- */
-public class GZIPAcceptEncodingTest extends Assert {
-
-    private GZIPOutInterceptor interceptor;
-    private Message inMessage;
-    private Message outMessage;
-    private InterceptorChain outInterceptors;
-
-    @Before
-    public void setUp() throws Exception {
-        interceptor = new GZIPOutInterceptor();
-        inMessage = new MessageImpl();
-        outMessage = new MessageImpl();
-        Exchange exchange = new ExchangeImpl();
-        exchange.setInMessage(inMessage);
-        inMessage.setExchange(exchange);
-        inMessage.setContent(InputStream.class, new ByteArrayInputStream(new byte[0]));
-        exchange.setOutMessage(outMessage);
-        outMessage.setExchange(exchange);
-        outMessage.setContent(OutputStream.class, new ByteArrayOutputStream());
-        outInterceptors = EasyMock.createMock(InterceptorChain.class);
-        outMessage.setInterceptorChain(outInterceptors);
-    }
-
-    @Test
-    public void testNoAcceptEncoding() throws Exception {
-        EasyMock.replay(outInterceptors);
-        interceptor.handleMessage(outMessage);
-    }
-
-    @Test
-    public void testAcceptGzip() throws Exception {
-        singleTest("gzip", true, YES, "gzip");
-    }
-
-    @Test
-    public void testAcceptXGzip() throws Exception {
-        singleTest("x-gzip, x-compress", true, YES, "x-gzip");
-    }
-
-    @Test
-    public void testAcceptStar() throws Exception {
-        singleTest("*", true, YES, "gzip");
-    }
-
-    @Test
-    public void testAcceptOnlyGzip() throws Exception {
-        singleTest("gzip, identity; q=0", true, FORCE, "gzip");
-    }
-
-    @Test
-    public void testOnlyIdentitySupported() throws Exception {
-        singleTest("deflate", false, null, null);
-    }
-
-    @Test
-    public void testGzipExplicitlyDisabled() throws Exception {
-        singleTest("gzip; q=0.00", false, null, null);
-    }
-
-    @Test(expected = Fault.class)
-    public void testNoValidEncodings() throws Exception {
-        EasyMock.replay();
-        setAcceptEncoding("*;q=0, deflate;q=0.5");
-        interceptor.handleMessage(outMessage);
-    }
-
-    private void singleTest(String encoding, boolean expectEndingInterceptor,
-                            GZIPOutInterceptor.UseGzip expectedUseGzip, String expectedGzipEncoding)
-        throws Exception {
-
-        EasyMock.replay(outInterceptors);
-        setAcceptEncoding(encoding);
-        interceptor.handleMessage(outMessage);
-        assertSame("Wrong value of " + GZIPOutInterceptor.USE_GZIP_KEY, expectedUseGzip, outMessage
-            .get(GZIPOutInterceptor.USE_GZIP_KEY));
-        assertEquals("Wrong value of " + GZIPOutInterceptor.GZIP_ENCODING_KEY, expectedGzipEncoding,
-                     outMessage.get(GZIPOutInterceptor.GZIP_ENCODING_KEY));
-    }
-
-    private void setAcceptEncoding(String enc) {
-        Map<String, List<String>> protocolHeaders 
-            = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
-        protocolHeaders.put(HttpHeaderHelper.getHeaderKey(HttpHeaderHelper.ACCEPT_ENCODING), Collections
-            .singletonList(enc));
-        inMessage.put(Message.PROTOCOL_HEADERS, protocolHeaders);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.common.gzip;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.easymock.EasyMock;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.FORCE;
+import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.YES;
+
+/**
+ * Test for the parsing of Accept-Encoding by the GZIPOutInterceptor. For
+ * Accept-Encoding values that enable gzip we expect an extra interceptor to be
+ * added to the out message, and the {@link GZIPOutInterceptor#USE_GZIP_KEY} to
+ * be set correctly. For Accept-Encoding values that do not enable gzip the
+ * interceptor should not be added.
+ */
+public class GZIPAcceptEncodingTest extends Assert {
+
+    private GZIPOutInterceptor interceptor;
+    private Message inMessage;
+    private Message outMessage;
+    private InterceptorChain outInterceptors;
+
+    @Before
+    public void setUp() throws Exception {
+        interceptor = new GZIPOutInterceptor();
+        inMessage = new MessageImpl();
+        outMessage = new MessageImpl();
+        Exchange exchange = new ExchangeImpl();
+        exchange.setInMessage(inMessage);
+        inMessage.setExchange(exchange);
+        inMessage.setContent(InputStream.class, new ByteArrayInputStream(new byte[0]));
+        exchange.setOutMessage(outMessage);
+        outMessage.setExchange(exchange);
+        outMessage.setContent(OutputStream.class, new ByteArrayOutputStream());
+        outInterceptors = EasyMock.createMock(InterceptorChain.class);
+        outMessage.setInterceptorChain(outInterceptors);
+    }
+
+    @Test
+    public void testNoAcceptEncoding() throws Exception {
+        EasyMock.replay(outInterceptors);
+        interceptor.handleMessage(outMessage);
+    }
+
+    @Test
+    public void testAcceptGzip() throws Exception {
+        singleTest("gzip", true, YES, "gzip");
+    }
+
+    @Test
+    public void testAcceptXGzip() throws Exception {
+        singleTest("x-gzip, x-compress", true, YES, "x-gzip");
+    }
+
+    @Test
+    public void testAcceptStar() throws Exception {
+        singleTest("*", true, YES, "gzip");
+    }
+
+    @Test
+    public void testAcceptOnlyGzip() throws Exception {
+        singleTest("gzip, identity; q=0", true, FORCE, "gzip");
+    }
+
+    @Test
+    public void testOnlyIdentitySupported() throws Exception {
+        singleTest("deflate", false, null, null);
+    }
+
+    @Test
+    public void testGzipExplicitlyDisabled() throws Exception {
+        singleTest("gzip; q=0.00", false, null, null);
+    }
+
+    @Test(expected = Fault.class)
+    public void testNoValidEncodings() throws Exception {
+        EasyMock.replay();
+        setAcceptEncoding("*;q=0, deflate;q=0.5");
+        interceptor.handleMessage(outMessage);
+    }
+
+    private void singleTest(String encoding, boolean expectEndingInterceptor,
+                            GZIPOutInterceptor.UseGzip expectedUseGzip, String expectedGzipEncoding)
+        throws Exception {
+
+        EasyMock.replay(outInterceptors);
+        setAcceptEncoding(encoding);
+        interceptor.handleMessage(outMessage);
+        assertSame("Wrong value of " + GZIPOutInterceptor.USE_GZIP_KEY, expectedUseGzip, outMessage
+            .get(GZIPOutInterceptor.USE_GZIP_KEY));
+        assertEquals("Wrong value of " + GZIPOutInterceptor.GZIP_ENCODING_KEY, expectedGzipEncoding,
+                     outMessage.get(GZIPOutInterceptor.GZIP_ENCODING_KEY));
+    }
+
+    private void setAcceptEncoding(String enc) {
+        Map<String, List<String>> protocolHeaders 
+            = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
+        protocolHeaders.put(HttpHeaderHelper.getHeaderKey(HttpHeaderHelper.ACCEPT_ENCODING), Collections
+            .singletonList(enc));
+        inMessage.put(Message.PROTOCOL_HEADERS, protocolHeaders);
+    }
+}

Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java Fri Apr  6 16:21:14 2012
@@ -1,165 +1,165 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.clustering;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.xml.namespace.QName;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.service.model.ServiceInfo;
-
-/**
- * Failover strategy based on a static cluster represented by
- * multiple endpoints associated with the same service instance.
- */
-public abstract class AbstractStaticFailoverStrategy implements FailoverStrategy {
-    private static final Logger LOG =
-        LogUtils.getL7dLogger(AbstractStaticFailoverStrategy.class);
-
-    private List<String> alternateAddresses;
-    private long delayBetweenRetries;
-
-    public void setDelayBetweenRetries(long delay) {
-        this.delayBetweenRetries = delay;
-    }
-    
-    public long getDelayBetweenRetries() {
-        return this.delayBetweenRetries;
-    }
-    
-    public void setAlternateAddresses(List<String> alternateAddresses) {
-        this.alternateAddresses = alternateAddresses;
-    }
-   
-    /**
-     * Get the alternate addresses for this invocation.
-     * 
-     * @param exchange the current Exchange
-     * @return a List of alternate addresses if available
-     */
-    public List<String> getAlternateAddresses(Exchange exchange) {
-        return alternateAddresses != null
-               ? new ArrayList<String>(alternateAddresses)
-               : null;
-    }
-
-    /**
-     * Select one of the alternate addresses for a retried invocation.
-     * 
-     * @param a List of alternate addresses if available
-     * @return the selected address
-     */
-    public String selectAlternateAddress(List<String> alternates) {
-        String selected = null;
-        if (alternates != null && alternates.size() > 0) {
-            selected = getNextAlternate(alternates);
-            LOG.log(Level.WARNING,
-                    "FAILING_OVER_TO_ADDRESS_OVERRIDE",
-                    selected);
-        } else {
-            LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
-        }
-        return selected;
-    }
-
-    /**
-     * Get the alternate endpoints for this invocation.
-     * 
-     * @param exchange the current Exchange
-     * @return a List of alternate endpoints if available
-     */
-    public List<Endpoint> getAlternateEndpoints(Exchange exchange) {
-        return getEndpoints(exchange, false);
-    }
-    
-    /**
-     * Select one of the alternate endpoints for a retried invocation.
-     * 
-     * @param a List of alternate endpoints if available
-     * @return the selected endpoint
-     */
-    public Endpoint selectAlternateEndpoint(List<Endpoint> alternates) {
-        Endpoint selected = null;
-        if (alternates != null && alternates.size() > 0) {
-            selected = getNextAlternate(alternates);
-            LOG.log(Level.WARNING,
-                    "FAILING_OVER_TO_ALTERNATE_ENDPOINT",
-                     new Object[] {selected.getEndpointInfo().getName(),
-                                   selected.getEndpointInfo().getAddress()});
-        } else {
-            LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
-        }
-        return selected;
-    }
-    
-    /**
-     * Get the endpoints for this invocation.
-     * 
-     * @param exchange the current Exchange
-     * @param acceptCandidatesWithSameAddress true to accept candidates with the same address
-     * @return a List of alternate endpoints if available
-     */
-    protected List<Endpoint> getEndpoints(Exchange exchange, boolean acceptCandidatesWithSameAddress) {
-        Endpoint endpoint = exchange.get(Endpoint.class);
-        Collection<ServiceInfo> services = endpoint.getService().getServiceInfos();
-        QName currentBinding = endpoint.getBinding().getBindingInfo().getName();
-        List<Endpoint> alternates = new ArrayList<Endpoint>();
-        for (ServiceInfo service : services) {
-            Collection<EndpointInfo> candidates = service.getEndpoints();
-            for (EndpointInfo candidate : candidates) {
-                QName candidateBinding = candidate.getBinding().getName();
-                if (candidateBinding.equals(currentBinding)) {
-                    if (acceptCandidatesWithSameAddress || !candidate.getAddress().equals(
-                             endpoint.getEndpointInfo().getAddress())) {
-                        Endpoint alternate =
-                            endpoint.getService().getEndpoints().get(candidate.getName());
-                        if (alternate != null) {
-                            LOG.log(Level.INFO,
-                                    "FAILOVER_CANDIDATE_ACCEPTED",
-                                    candidate.getName());
-                            alternates.add(alternate);
-                        }
-                    }
-                } else {
-                    LOG.log(Level.INFO,
-                            "FAILOVER_CANDIDATE_REJECTED",
-                            new Object[] {candidate.getName(), candidateBinding});
-                }
-            }
-        }
-        return alternates;
-    }
-
-    /**
-     * Get next alternate endpoint.
-     * 
-     * @param alternates non-empty List of alternate endpoints 
-     * @return
-     */
-    protected abstract <T> T getNextAlternate(List<T> alternates);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.ServiceInfo;
+
+/**
+ * Failover strategy based on a static cluster represented by
+ * multiple endpoints associated with the same service instance.
+ */
+public abstract class AbstractStaticFailoverStrategy implements FailoverStrategy {
+    private static final Logger LOG =
+        LogUtils.getL7dLogger(AbstractStaticFailoverStrategy.class);
+
+    private List<String> alternateAddresses;
+    private long delayBetweenRetries;
+
+    public void setDelayBetweenRetries(long delay) {
+        this.delayBetweenRetries = delay;
+    }
+    
+    public long getDelayBetweenRetries() {
+        return this.delayBetweenRetries;
+    }
+    
+    public void setAlternateAddresses(List<String> alternateAddresses) {
+        this.alternateAddresses = alternateAddresses;
+    }
+   
+    /**
+     * Get the alternate addresses for this invocation.
+     * 
+     * @param exchange the current Exchange
+     * @return a List of alternate addresses if available
+     */
+    public List<String> getAlternateAddresses(Exchange exchange) {
+        return alternateAddresses != null
+               ? new ArrayList<String>(alternateAddresses)
+               : null;
+    }
+
+    /**
+     * Select one of the alternate addresses for a retried invocation.
+     * 
+     * @param a List of alternate addresses if available
+     * @return the selected address
+     */
+    public String selectAlternateAddress(List<String> alternates) {
+        String selected = null;
+        if (alternates != null && alternates.size() > 0) {
+            selected = getNextAlternate(alternates);
+            LOG.log(Level.WARNING,
+                    "FAILING_OVER_TO_ADDRESS_OVERRIDE",
+                    selected);
+        } else {
+            LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
+        }
+        return selected;
+    }
+
+    /**
+     * Get the alternate endpoints for this invocation.
+     * 
+     * @param exchange the current Exchange
+     * @return a List of alternate endpoints if available
+     */
+    public List<Endpoint> getAlternateEndpoints(Exchange exchange) {
+        return getEndpoints(exchange, false);
+    }
+    
+    /**
+     * Select one of the alternate endpoints for a retried invocation.
+     * 
+     * @param a List of alternate endpoints if available
+     * @return the selected endpoint
+     */
+    public Endpoint selectAlternateEndpoint(List<Endpoint> alternates) {
+        Endpoint selected = null;
+        if (alternates != null && alternates.size() > 0) {
+            selected = getNextAlternate(alternates);
+            LOG.log(Level.WARNING,
+                    "FAILING_OVER_TO_ALTERNATE_ENDPOINT",
+                     new Object[] {selected.getEndpointInfo().getName(),
+                                   selected.getEndpointInfo().getAddress()});
+        } else {
+            LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
+        }
+        return selected;
+    }
+    
+    /**
+     * Get the endpoints for this invocation.
+     * 
+     * @param exchange the current Exchange
+     * @param acceptCandidatesWithSameAddress true to accept candidates with the same address
+     * @return a List of alternate endpoints if available
+     */
+    protected List<Endpoint> getEndpoints(Exchange exchange, boolean acceptCandidatesWithSameAddress) {
+        Endpoint endpoint = exchange.get(Endpoint.class);
+        Collection<ServiceInfo> services = endpoint.getService().getServiceInfos();
+        QName currentBinding = endpoint.getBinding().getBindingInfo().getName();
+        List<Endpoint> alternates = new ArrayList<Endpoint>();
+        for (ServiceInfo service : services) {
+            Collection<EndpointInfo> candidates = service.getEndpoints();
+            for (EndpointInfo candidate : candidates) {
+                QName candidateBinding = candidate.getBinding().getName();
+                if (candidateBinding.equals(currentBinding)) {
+                    if (acceptCandidatesWithSameAddress || !candidate.getAddress().equals(
+                             endpoint.getEndpointInfo().getAddress())) {
+                        Endpoint alternate =
+                            endpoint.getService().getEndpoints().get(candidate.getName());
+                        if (alternate != null) {
+                            LOG.log(Level.INFO,
+                                    "FAILOVER_CANDIDATE_ACCEPTED",
+                                    candidate.getName());
+                            alternates.add(alternate);
+                        }
+                    }
+                } else {
+                    LOG.log(Level.INFO,
+                            "FAILOVER_CANDIDATE_REJECTED",
+                            new Object[] {candidate.getName(), candidateBinding});
+                }
+            }
+        }
+        return alternates;
+    }
+
+    /**
+     * Get next alternate endpoint.
+     * 
+     * @param alternates non-empty List of alternate endpoints 
+     * @return
+     */
+    protected abstract <T> T getNextAlternate(List<T> alternates);
+}

Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java Fri Apr  6 16:21:14 2012
@@ -1,84 +1,84 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.clustering;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.annotations.EvaluateAllEndpoints;
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-import org.apache.cxf.endpoint.Client;
-import org.apache.cxf.endpoint.ConduitSelector;
-import org.apache.cxf.endpoint.ConduitSelectorHolder;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.feature.AbstractFeature;
-import org.apache.cxf.interceptor.InterceptorProvider;
-
-/**
- * This feature may be applied to a Client so as to enable
- * failover from the initial target endpoint to any other
- * compatible endpoint for the target service.
- */
-@NoJSR250Annotations
-@EvaluateAllEndpoints
-public class FailoverFeature extends AbstractFeature {
-
-    private FailoverStrategy failoverStrategy;
-    private FailoverTargetSelector targetSelector;
-    
-    @Override
-    protected void initializeProvider(InterceptorProvider provider, Bus bus) {
-        if (provider instanceof ConduitSelectorHolder) {
-            ConduitSelectorHolder csHolder = (ConduitSelectorHolder) provider;
-            Endpoint endpoint = csHolder.getConduitSelector().getEndpoint();
-            ConduitSelector conduitSelector = initTargetSelector(endpoint);
-            csHolder.setConduitSelector(conduitSelector);
-        }
-    }
-
-    @Override
-    public void initialize(Client client, Bus bus) {
-        ConduitSelector selector = initTargetSelector(client.getConduitSelector().getEndpoint());
-        client.setConduitSelector(selector);
-    }
-
-    protected ConduitSelector initTargetSelector(Endpoint endpoint) {
-        FailoverTargetSelector selector = getTargetSelector();
-        selector.setEndpoint(endpoint);
-        selector.setStrategy(getStrategy());
-        return selector;
-    }
-    
-    public FailoverTargetSelector getTargetSelector() {
-        if (this.targetSelector == null) {
-            this.targetSelector = new FailoverTargetSelector();
-        }
-        return this.targetSelector;
-    }
-    
-    public void setTargetSelector(FailoverTargetSelector selector) {
-        this.targetSelector = selector;
-    }
-    
-    public void setStrategy(FailoverStrategy strategy) {
-        failoverStrategy = strategy;
-    }
-    
-    public FailoverStrategy getStrategy()  {
-        return failoverStrategy;
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.clustering;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.annotations.EvaluateAllEndpoints;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ConduitSelector;
+import org.apache.cxf.endpoint.ConduitSelectorHolder;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.feature.AbstractFeature;
+import org.apache.cxf.interceptor.InterceptorProvider;
+
+/**
+ * This feature may be applied to a Client so as to enable
+ * failover from the initial target endpoint to any other
+ * compatible endpoint for the target service.
+ */
+@NoJSR250Annotations
+@EvaluateAllEndpoints
+public class FailoverFeature extends AbstractFeature {
+
+    private FailoverStrategy failoverStrategy;
+    private FailoverTargetSelector targetSelector;
+    
+    @Override
+    protected void initializeProvider(InterceptorProvider provider, Bus bus) {
+        if (provider instanceof ConduitSelectorHolder) {
+            ConduitSelectorHolder csHolder = (ConduitSelectorHolder) provider;
+            Endpoint endpoint = csHolder.getConduitSelector().getEndpoint();
+            ConduitSelector conduitSelector = initTargetSelector(endpoint);
+            csHolder.setConduitSelector(conduitSelector);
+        }
+    }
+
+    @Override
+    public void initialize(Client client, Bus bus) {
+        ConduitSelector selector = initTargetSelector(client.getConduitSelector().getEndpoint());
+        client.setConduitSelector(selector);
+    }
+
+    protected ConduitSelector initTargetSelector(Endpoint endpoint) {
+        FailoverTargetSelector selector = getTargetSelector();
+        selector.setEndpoint(endpoint);
+        selector.setStrategy(getStrategy());
+        return selector;
+    }
+    
+    public FailoverTargetSelector getTargetSelector() {
+        if (this.targetSelector == null) {
+            this.targetSelector = new FailoverTargetSelector();
+        }
+        return this.targetSelector;
+    }
+    
+    public void setTargetSelector(FailoverTargetSelector selector) {
+        this.targetSelector = selector;
+    }
+    
+    public void setStrategy(FailoverStrategy strategy) {
+        failoverStrategy = strategy;
+    }
+    
+    public FailoverStrategy getStrategy()  {
+        return failoverStrategy;
+    }
+}

Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java Fri Apr  6 16:21:14 2012
@@ -1,64 +1,64 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.clustering;
-
-import java.util.List;
-
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.message.Exchange;
-
-/**
- * Supports pluggable strategies for alternate endpoint selection on
- * failover.
- */
-public interface FailoverStrategy {
-    /**
-     * Get the alternate endpoints for this invocation.
-     * 
-     * @param exchange the current Exchange     
-     * @return a failover endpoint if one is available
-     */
-    List<Endpoint> getAlternateEndpoints(Exchange exchange);
-    
-    /**
-     * Select one of the alternate endpoints for a retried invocation.
-     * 
-     * @param alternates List of alternate endpoints if available
-     * @return the selected endpoint
-     */
-    Endpoint selectAlternateEndpoint(List<Endpoint> alternates);
-
-    /**
-     * Get the alternate addresses for this invocation.
-     * These addresses over-ride any addresses specified in the WSDL.
-     * 
-     * @param exchange the current Exchange     
-     * @return a failover endpoint if one is available
-     */
-    List<String> getAlternateAddresses(Exchange exchange);
-
-    /**
-     * Select one of the alternate addresses for a retried invocation.
-     * 
-     * @param addresses List of alternate addresses if available
-     * @return the selected address
-     */
-    String selectAlternateAddress(List<String> addresses);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering;
+
+import java.util.List;
+
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
+
+/**
+ * Supports pluggable strategies for alternate endpoint selection on
+ * failover.
+ */
+public interface FailoverStrategy {
+    /**
+     * Get the alternate endpoints for this invocation.
+     * 
+     * @param exchange the current Exchange     
+     * @return a failover endpoint if one is available
+     */
+    List<Endpoint> getAlternateEndpoints(Exchange exchange);
+    
+    /**
+     * Select one of the alternate endpoints for a retried invocation.
+     * 
+     * @param alternates List of alternate endpoints if available
+     * @return the selected endpoint
+     */
+    Endpoint selectAlternateEndpoint(List<Endpoint> alternates);
+
+    /**
+     * Get the alternate addresses for this invocation.
+     * These addresses over-ride any addresses specified in the WSDL.
+     * 
+     * @param exchange the current Exchange     
+     * @return a failover endpoint if one is available
+     */
+    List<String> getAlternateAddresses(Exchange exchange);
+
+    /**
+     * Select one of the alternate addresses for a retried invocation.
+     * 
+     * @param addresses List of alternate addresses if available
+     * @return the selected address
+     */
+    String selectAlternateAddress(List<String> addresses);
+}

Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java Fri Apr  6 16:21:14 2012
@@ -1,436 +1,436 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.clustering;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.endpoint.AbstractConduitSelector;
-import org.apache.cxf.endpoint.Client;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.endpoint.Retryable;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.service.model.BindingOperationInfo;
-import org.apache.cxf.transport.Conduit;
-
-
-/**
- * Implements a target selection strategy based on failover to an 
- * alternate target endpoint when a transport level failure is 
- * encountered.
- * Note that this feature changes the conduit on the fly and thus makes
- * the Client not thread safe.
- */
-public class FailoverTargetSelector extends AbstractConduitSelector {
-
-    private static final Logger LOG =
-        LogUtils.getL7dLogger(FailoverTargetSelector.class);
-    protected Map<InvocationKey, InvocationContext> inProgress 
-        = new ConcurrentHashMap<InvocationKey, InvocationContext>();;
-    protected FailoverStrategy failoverStrategy;
-    
-    /**
-     * Normal constructor.
-     */
-    public FailoverTargetSelector() {
-        super();
-    }
-    
-    /**
-     * Constructor, allowing a specific conduit to override normal selection.
-     * 
-     * @param c specific conduit
-     */
-    public FailoverTargetSelector(Conduit c) {
-        super(c);
-    }
-    
-    /**
-     * Called prior to the interceptor chain being traversed.
-     * 
-     * @param message the current Message
-     */
-    public synchronized void prepare(Message message) {
-        Exchange exchange = message.getExchange();
-        InvocationKey key = new InvocationKey(exchange);
-        if (!inProgress.containsKey(key)) {
-            Endpoint endpoint = exchange.get(Endpoint.class);
-            BindingOperationInfo bindingOperationInfo =
-                exchange.getBindingOperationInfo();
-            Object[] params = message.getContent(List.class).toArray();
-            Map<String, Object> context =
-                CastUtils.cast((Map<?, ?>)message.get(Message.INVOCATION_CONTEXT));
-            InvocationContext invocation = 
-                new InvocationContext(endpoint, 
-                                      bindingOperationInfo,
-                                      params,
-                                      context);
-            inProgress.put(key, invocation);
-        }
-    }
-
-    /**
-     * Called when a Conduit is actually required.
-     * 
-     * @param message
-     * @return the Conduit to use for mediation of the message
-     */
-    public Conduit selectConduit(Message message) {
-        Conduit c = message.get(Conduit.class);
-        if (c != null) {
-            return c;
-        }
-        return getSelectedConduit(message);
-    }
-
-    /**
-     * Called on completion of the MEP for which the Conduit was required.
-     * 
-     * @param exchange represents the completed MEP
-     */
-    public void complete(Exchange exchange) {
-        InvocationKey key = new InvocationKey(exchange);
-        InvocationContext invocation = null;
-        synchronized (this) {
-            invocation = inProgress.get(key);
-        }
-        boolean failover = false;
-        if (requiresFailover(exchange)) {
-            Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName());
-            
-            Endpoint failoverTarget = getFailoverTarget(exchange, invocation);
-            if (failoverTarget != null) {
-                setEndpoint(failoverTarget);
-                if (old != null) {
-                    old.close();
-                    conduits.remove(old);
-                }
-                Exception prevExchangeFault =
-                    (Exception)exchange.remove(Exception.class.getName());
-                Message outMessage = exchange.getOutMessage();
-                Exception prevMessageFault =
-                    outMessage.getContent(Exception.class);
-                outMessage.setContent(Exception.class, null);
-                overrideAddressProperty(invocation.getContext());
-                Retryable retry = exchange.get(Retryable.class);
-                exchange.clear();
-                if (retry != null) {
-                    try {
-                        failover = true;
-                        long delay = getDelayBetweenRetries();
-                        if (delay > 0) {
-                            Thread.sleep(delay);
-                        }
-                        retry.invoke(invocation.getBindingOperationInfo(),
-                                     invocation.getParams(),
-                                     invocation.getContext(),
-                                     exchange);
-                    } catch (Exception e) {
-                        if (exchange.get(Exception.class) != null) {
-                            exchange.put(Exception.class, prevExchangeFault);
-                        }
-                        if (outMessage.getContent(Exception.class) != null) {
-                            outMessage.setContent(Exception.class,
-                                                  prevMessageFault);
-                        }
-                    }
-                }
-            } else {
-                setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
-            }
-        }
-        if (!failover) {
-            getLogger().fine("FAILOVER_NOT_REQUIRED");
-            synchronized (this) {
-                inProgress.remove(key);
-            }
-            super.complete(exchange);
-        }
-    }
-    
-    /**
-     * @param strategy the FailoverStrategy to use
-     */
-    public synchronized void setStrategy(FailoverStrategy strategy) {
-        getLogger().log(Level.INFO, "USING_STRATEGY", new Object[] {strategy});
-        failoverStrategy = strategy;
-    }
-    
-    /**
-     * @return strategy the FailoverStrategy to use
-     */    
-    public synchronized FailoverStrategy getStrategy()  {
-        if (failoverStrategy == null) {
-            failoverStrategy = new SequentialStrategy();
-            getLogger().log(Level.INFO,
-                            "USING_STRATEGY",
-                            new Object[] {failoverStrategy});
-        }
-        return failoverStrategy;
-    }
-
-    /**
-     * @return the logger to use
-     */
-    protected Logger getLogger() {
-        return LOG;
-    }
-
-    /**
-     * Returns delay (in milliseconds) between retries
-     * @return delay, 0 means no delay
-     */
-    protected long getDelayBetweenRetries() {
-        FailoverStrategy strategy = getStrategy();
-        if (strategy instanceof AbstractStaticFailoverStrategy) {
-            return ((AbstractStaticFailoverStrategy)strategy).getDelayBetweenRetries();
-        }
-        //perhaps supporting FailoverTargetSelector specific property can make sense too
-        return 0;
-    }
-    
-    /**
-     * Check if the exchange is suitable for a failover.
-     * 
-     * @param exchange the current Exchange
-     * @return boolean true if a failover should be attempted
-     */
-    protected boolean requiresFailover(Exchange exchange) {
-        Message outMessage = exchange.getOutMessage();
-        Exception ex = outMessage.get(Exception.class) != null
-                       ? outMessage.get(Exception.class)
-                       : exchange.get(Exception.class);
-        getLogger().log(Level.FINE,
-                        "CHECK_LAST_INVOKE_FAILED",
-                        new Object[] {ex != null});
-        Throwable curr = ex;
-        boolean failover = false;
-        while (curr != null) {
-            failover = curr instanceof java.io.IOException;
-            curr = curr.getCause();
-        }
-        if (ex != null) {
-            getLogger().log(Level.INFO,
-                            "CHECK_FAILURE_IN_TRANSPORT",
-                            new Object[] {ex, failover});
-        }
-        return failover;
-    }
-    
-    /**
-     * Get the failover target endpoint, if a suitable one is available.
-     * 
-     * @param exchange the current Exchange
-     * @param invocation the current InvocationContext
-     * @return a failover endpoint if one is available
-     */
-    protected Endpoint getFailoverTarget(Exchange exchange,
-                                       InvocationContext invocation) {
-        List<String> alternateAddresses = null;
-        if (!invocation.hasAlternates()) {
-            // no previous failover attempt on this invocation
-            //
-            alternateAddresses = 
-                getStrategy().getAlternateAddresses(exchange);
-            if (alternateAddresses != null) {
-                invocation.setAlternateAddresses(alternateAddresses);
-            } else {
-                invocation.setAlternateEndpoints(
-                    getStrategy().getAlternateEndpoints(exchange));
-            }
-        } else {
-            alternateAddresses = invocation.getAlternateAddresses();
-        }
-
-        Endpoint failoverTarget = null;
-        if (alternateAddresses != null) {
-            String alternateAddress = 
-                getStrategy().selectAlternateAddress(alternateAddresses);
-            if (alternateAddress != null) {
-                // re-use current endpoint
-                //
-                failoverTarget = getEndpoint();
-
-                failoverTarget.getEndpointInfo().setAddress(alternateAddress);
-            }
-        } else {
-            failoverTarget = getStrategy().selectAlternateEndpoint(
-                                 invocation.getAlternateEndpoints());
-        }
-        return failoverTarget;
-    }
-    
-    /**
-     * Override the ENDPOINT_ADDRESS property in the request context
-     * 
-     * @param context the request context
-     */
-    protected void overrideAddressProperty(Map<String, Object> context) {
-        overrideAddressProperty(context, getEndpoint().getEndpointInfo().getAddress());
-    }
-    
-    protected void overrideAddressProperty(Map<String, Object> context,
-                                           String address) {
-        Map<String, Object> requestContext =
-            CastUtils.cast((Map<?, ?>)context.get(Client.REQUEST_CONTEXT));
-        if (requestContext != null) {
-            requestContext.put(Message.ENDPOINT_ADDRESS, address);
-            requestContext.put("javax.xml.ws.service.endpoint.address", address);
-        }
-    }
-    
-    // Some conduits may replace the endpoint address after it has already been prepared
-    // but before the invocation has been done (ex, org.apache.cxf.clustering.LoadDistributorTargetSelector)
-    // which may affect JAX-RS clients where actual endpoint address property may include additional path 
-    // segments.  
-    protected boolean replaceEndpointAddressPropertyIfNeeded(Message message, 
-                                                             String endpointAddress,
-                                                             Conduit cond) {
-        String requestURI = (String)message.get(Message.REQUEST_URI);
-        if (requestURI != null && endpointAddress != null && !requestURI.startsWith(endpointAddress)) {
-            String basePath = (String)message.get(Message.BASE_PATH);
-            if (basePath != null && requestURI.startsWith(basePath)) {
-                String pathInfo = requestURI.substring(basePath.length());
-                message.put(Message.BASE_PATH, endpointAddress);
-                final String slash = "/";
-                boolean startsWithSlash = pathInfo.startsWith(slash);
-                if (endpointAddress.endsWith(slash)) {
-                    endpointAddress = endpointAddress + (startsWithSlash ? pathInfo.substring(1) : pathInfo);
-                } else {
-                    endpointAddress = endpointAddress + (startsWithSlash ? pathInfo : (slash + pathInfo));
-                }
-                message.put(Message.ENDPOINT_ADDRESS, endpointAddress);
-
-                Exchange exchange = message.getExchange();
-                InvocationKey key = new InvocationKey(exchange);
-                InvocationContext invocation = inProgress.get(key);
-                if (invocation != null) {
-                    overrideAddressProperty(invocation.getContext(),
-                                            cond.getTarget().getAddress().getValue());
-                }
-                return true;
-            }
-        }
-        return false;
-    }
-            
-    /**
-     * Used to wrap an Exchange for usage as a Map key. The raw Exchange
-     * is not a suitable key type, as the hashCode is computed from its
-     * current contents, which may obviously change over the lifetime of
-     * an invocation.
-     */
-    protected static class InvocationKey {
-        private Exchange exchange;
-        
-        InvocationKey(Exchange ex) {
-            exchange = ex;     
-        }
-        
-        @Override
-        public int hashCode() {
-            return System.identityHashCode(exchange);
-        }
-        
-        @Override
-        public boolean equals(Object o) {
-            return o instanceof InvocationKey
-                   && exchange == ((InvocationKey)o).exchange;
-        }
-    }
-
-
-    /**
-     * Records the context of an invocation.
-     */
-    protected class InvocationContext {
-        private Endpoint originalEndpoint;
-        private String originalAddress;
-        private BindingOperationInfo bindingOperationInfo;
-        private Object[] params; 
-        private Map<String, Object> context;
-        private List<Endpoint> alternateEndpoints;
-        private List<String> alternateAddresses;
-        
-        InvocationContext(Endpoint endpoint,
-                          BindingOperationInfo boi,
-                          Object[] prms, 
-                          Map<String, Object> ctx) {
-            originalEndpoint = endpoint;
-            originalAddress = endpoint.getEndpointInfo().getAddress();
-            bindingOperationInfo = boi;
-            params = prms;
-            context = ctx;
-        }
-
-        Endpoint retrieveOriginalEndpoint(Endpoint endpoint) {
-            if (endpoint != originalEndpoint) {
-                getLogger().log(Level.INFO,
-                                "REVERT_TO_ORIGINAL_TARGET",
-                                endpoint.getEndpointInfo().getName());
-            }
-            if (!endpoint.getEndpointInfo().getAddress().equals(originalAddress)) {
-                endpoint.getEndpointInfo().setAddress(originalAddress);
-                getLogger().log(Level.INFO,
-                                "REVERT_TO_ORIGINAL_ADDRESS",
-                                endpoint.getEndpointInfo().getAddress());
-            }
-            return originalEndpoint;
-        }
-        
-        BindingOperationInfo getBindingOperationInfo() {
-            return bindingOperationInfo;
-        }
-        
-        Object[] getParams() {
-            return params;
-        }
-        
-        Map<String, Object> getContext() {
-            return context;
-        }
-        
-        List<Endpoint> getAlternateEndpoints() {
-            return alternateEndpoints;
-        }
-
-        List<String> getAlternateAddresses() {
-            return alternateAddresses;
-        }
-
-        void setAlternateEndpoints(List<Endpoint> alternates) {
-            alternateEndpoints = alternates;
-        }
-
-        void setAlternateAddresses(List<String> alternates) {
-            alternateAddresses = alternates;
-        }
-
-        boolean hasAlternates() {
-            return !(alternateEndpoints == null && alternateAddresses == null);
-        }
-    }    
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.AbstractConduitSelector;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.Retryable;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.transport.Conduit;
+
+
+/**
+ * Implements a target selection strategy based on failover to an 
+ * alternate target endpoint when a transport level failure is 
+ * encountered.
+ * Note that this feature changes the conduit on the fly and thus makes
+ * the Client not thread safe.
+ */
+public class FailoverTargetSelector extends AbstractConduitSelector {
+
+    private static final Logger LOG =
+        LogUtils.getL7dLogger(FailoverTargetSelector.class);
+    protected Map<InvocationKey, InvocationContext> inProgress 
+        = new ConcurrentHashMap<InvocationKey, InvocationContext>();;
+    protected FailoverStrategy failoverStrategy;
+    
+    /**
+     * Normal constructor.
+     */
+    public FailoverTargetSelector() {
+        super();
+    }
+    
+    /**
+     * Constructor, allowing a specific conduit to override normal selection.
+     * 
+     * @param c specific conduit
+     */
+    public FailoverTargetSelector(Conduit c) {
+        super(c);
+    }
+    
+    /**
+     * Called prior to the interceptor chain being traversed.
+     * 
+     * @param message the current Message
+     */
+    public synchronized void prepare(Message message) {
+        Exchange exchange = message.getExchange();
+        InvocationKey key = new InvocationKey(exchange);
+        if (!inProgress.containsKey(key)) {
+            Endpoint endpoint = exchange.get(Endpoint.class);
+            BindingOperationInfo bindingOperationInfo =
+                exchange.getBindingOperationInfo();
+            Object[] params = message.getContent(List.class).toArray();
+            Map<String, Object> context =
+                CastUtils.cast((Map<?, ?>)message.get(Message.INVOCATION_CONTEXT));
+            InvocationContext invocation = 
+                new InvocationContext(endpoint, 
+                                      bindingOperationInfo,
+                                      params,
+                                      context);
+            inProgress.put(key, invocation);
+        }
+    }
+
+    /**
+     * Called when a Conduit is actually required.
+     * 
+     * @param message
+     * @return the Conduit to use for mediation of the message
+     */
+    public Conduit selectConduit(Message message) {
+        Conduit c = message.get(Conduit.class);
+        if (c != null) {
+            return c;
+        }
+        return getSelectedConduit(message);
+    }
+
+    /**
+     * Called on completion of the MEP for which the Conduit was required.
+     * 
+     * @param exchange represents the completed MEP
+     */
+    public void complete(Exchange exchange) {
+        InvocationKey key = new InvocationKey(exchange);
+        InvocationContext invocation = null;
+        synchronized (this) {
+            invocation = inProgress.get(key);
+        }
+        boolean failover = false;
+        if (requiresFailover(exchange)) {
+            Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName());
+            
+            Endpoint failoverTarget = getFailoverTarget(exchange, invocation);
+            if (failoverTarget != null) {
+                setEndpoint(failoverTarget);
+                if (old != null) {
+                    old.close();
+                    conduits.remove(old);
+                }
+                Exception prevExchangeFault =
+                    (Exception)exchange.remove(Exception.class.getName());
+                Message outMessage = exchange.getOutMessage();
+                Exception prevMessageFault =
+                    outMessage.getContent(Exception.class);
+                outMessage.setContent(Exception.class, null);
+                overrideAddressProperty(invocation.getContext());
+                Retryable retry = exchange.get(Retryable.class);
+                exchange.clear();
+                if (retry != null) {
+                    try {
+                        failover = true;
+                        long delay = getDelayBetweenRetries();
+                        if (delay > 0) {
+                            Thread.sleep(delay);
+                        }
+                        retry.invoke(invocation.getBindingOperationInfo(),
+                                     invocation.getParams(),
+                                     invocation.getContext(),
+                                     exchange);
+                    } catch (Exception e) {
+                        if (exchange.get(Exception.class) != null) {
+                            exchange.put(Exception.class, prevExchangeFault);
+                        }
+                        if (outMessage.getContent(Exception.class) != null) {
+                            outMessage.setContent(Exception.class,
+                                                  prevMessageFault);
+                        }
+                    }
+                }
+            } else {
+                setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
+            }
+        }
+        if (!failover) {
+            getLogger().fine("FAILOVER_NOT_REQUIRED");
+            synchronized (this) {
+                inProgress.remove(key);
+            }
+            super.complete(exchange);
+        }
+    }
+    
+    /**
+     * @param strategy the FailoverStrategy to use
+     */
+    public synchronized void setStrategy(FailoverStrategy strategy) {
+        getLogger().log(Level.INFO, "USING_STRATEGY", new Object[] {strategy});
+        failoverStrategy = strategy;
+    }
+    
+    /**
+     * @return strategy the FailoverStrategy to use
+     */    
+    public synchronized FailoverStrategy getStrategy()  {
+        if (failoverStrategy == null) {
+            failoverStrategy = new SequentialStrategy();
+            getLogger().log(Level.INFO,
+                            "USING_STRATEGY",
+                            new Object[] {failoverStrategy});
+        }
+        return failoverStrategy;
+    }
+
+    /**
+     * @return the logger to use
+     */
+    protected Logger getLogger() {
+        return LOG;
+    }
+
+    /**
+     * Returns delay (in milliseconds) between retries
+     * @return delay, 0 means no delay
+     */
+    protected long getDelayBetweenRetries() {
+        FailoverStrategy strategy = getStrategy();
+        if (strategy instanceof AbstractStaticFailoverStrategy) {
+            return ((AbstractStaticFailoverStrategy)strategy).getDelayBetweenRetries();
+        }
+        //perhaps supporting FailoverTargetSelector specific property can make sense too
+        return 0;
+    }
+    
+    /**
+     * Check if the exchange is suitable for a failover.
+     * 
+     * @param exchange the current Exchange
+     * @return boolean true if a failover should be attempted
+     */
+    protected boolean requiresFailover(Exchange exchange) {
+        Message outMessage = exchange.getOutMessage();
+        Exception ex = outMessage.get(Exception.class) != null
+                       ? outMessage.get(Exception.class)
+                       : exchange.get(Exception.class);
+        getLogger().log(Level.FINE,
+                        "CHECK_LAST_INVOKE_FAILED",
+                        new Object[] {ex != null});
+        Throwable curr = ex;
+        boolean failover = false;
+        while (curr != null) {
+            failover = curr instanceof java.io.IOException;
+            curr = curr.getCause();
+        }
+        if (ex != null) {
+            getLogger().log(Level.INFO,
+                            "CHECK_FAILURE_IN_TRANSPORT",
+                            new Object[] {ex, failover});
+        }
+        return failover;
+    }
+    
+    /**
+     * Get the failover target endpoint, if a suitable one is available.
+     * 
+     * @param exchange the current Exchange
+     * @param invocation the current InvocationContext
+     * @return a failover endpoint if one is available
+     */
+    protected Endpoint getFailoverTarget(Exchange exchange,
+                                       InvocationContext invocation) {
+        List<String> alternateAddresses = null;
+        if (!invocation.hasAlternates()) {
+            // no previous failover attempt on this invocation
+            //
+            alternateAddresses = 
+                getStrategy().getAlternateAddresses(exchange);
+            if (alternateAddresses != null) {
+                invocation.setAlternateAddresses(alternateAddresses);
+            } else {
+                invocation.setAlternateEndpoints(
+                    getStrategy().getAlternateEndpoints(exchange));
+            }
+        } else {
+            alternateAddresses = invocation.getAlternateAddresses();
+        }
+
+        Endpoint failoverTarget = null;
+        if (alternateAddresses != null) {
+            String alternateAddress = 
+                getStrategy().selectAlternateAddress(alternateAddresses);
+            if (alternateAddress != null) {
+                // re-use current endpoint
+                //
+                failoverTarget = getEndpoint();
+
+                failoverTarget.getEndpointInfo().setAddress(alternateAddress);
+            }
+        } else {
+            failoverTarget = getStrategy().selectAlternateEndpoint(
+                                 invocation.getAlternateEndpoints());
+        }
+        return failoverTarget;
+    }
+    
+    /**
+     * Override the ENDPOINT_ADDRESS property in the request context
+     * 
+     * @param context the request context
+     */
+    protected void overrideAddressProperty(Map<String, Object> context) {
+        overrideAddressProperty(context, getEndpoint().getEndpointInfo().getAddress());
+    }
+    
+    protected void overrideAddressProperty(Map<String, Object> context,
+                                           String address) {
+        Map<String, Object> requestContext =
+            CastUtils.cast((Map<?, ?>)context.get(Client.REQUEST_CONTEXT));
+        if (requestContext != null) {
+            requestContext.put(Message.ENDPOINT_ADDRESS, address);
+            requestContext.put("javax.xml.ws.service.endpoint.address", address);
+        }
+    }
+    
+    // Some conduits may replace the endpoint address after it has already been prepared
+    // but before the invocation has been done (ex, org.apache.cxf.clustering.LoadDistributorTargetSelector)
+    // which may affect JAX-RS clients where actual endpoint address property may include additional path 
+    // segments.  
+    protected boolean replaceEndpointAddressPropertyIfNeeded(Message message, 
+                                                             String endpointAddress,
+                                                             Conduit cond) {
+        String requestURI = (String)message.get(Message.REQUEST_URI);
+        if (requestURI != null && endpointAddress != null && !requestURI.startsWith(endpointAddress)) {
+            String basePath = (String)message.get(Message.BASE_PATH);
+            if (basePath != null && requestURI.startsWith(basePath)) {
+                String pathInfo = requestURI.substring(basePath.length());
+                message.put(Message.BASE_PATH, endpointAddress);
+                final String slash = "/";
+                boolean startsWithSlash = pathInfo.startsWith(slash);
+                if (endpointAddress.endsWith(slash)) {
+                    endpointAddress = endpointAddress + (startsWithSlash ? pathInfo.substring(1) : pathInfo);
+                } else {
+                    endpointAddress = endpointAddress + (startsWithSlash ? pathInfo : (slash + pathInfo));
+                }
+                message.put(Message.ENDPOINT_ADDRESS, endpointAddress);
+
+                Exchange exchange = message.getExchange();
+                InvocationKey key = new InvocationKey(exchange);
+                InvocationContext invocation = inProgress.get(key);
+                if (invocation != null) {
+                    overrideAddressProperty(invocation.getContext(),
+                                            cond.getTarget().getAddress().getValue());
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+            
+    /**
+     * Used to wrap an Exchange for usage as a Map key. The raw Exchange
+     * is not a suitable key type, as the hashCode is computed from its
+     * current contents, which may obviously change over the lifetime of
+     * an invocation.
+     */
+    protected static class InvocationKey {
+        private Exchange exchange;
+        
+        InvocationKey(Exchange ex) {
+            exchange = ex;     
+        }
+        
+        @Override
+        public int hashCode() {
+            return System.identityHashCode(exchange);
+        }
+        
+        @Override
+        public boolean equals(Object o) {
+            return o instanceof InvocationKey
+                   && exchange == ((InvocationKey)o).exchange;
+        }
+    }
+
+
+    /**
+     * Records the context of an invocation.
+     */
+    protected class InvocationContext {
+        private Endpoint originalEndpoint;
+        private String originalAddress;
+        private BindingOperationInfo bindingOperationInfo;
+        private Object[] params; 
+        private Map<String, Object> context;
+        private List<Endpoint> alternateEndpoints;
+        private List<String> alternateAddresses;
+        
+        InvocationContext(Endpoint endpoint,
+                          BindingOperationInfo boi,
+                          Object[] prms, 
+                          Map<String, Object> ctx) {
+            originalEndpoint = endpoint;
+            originalAddress = endpoint.getEndpointInfo().getAddress();
+            bindingOperationInfo = boi;
+            params = prms;
+            context = ctx;
+        }
+
+        Endpoint retrieveOriginalEndpoint(Endpoint endpoint) {
+            if (endpoint != originalEndpoint) {
+                getLogger().log(Level.INFO,
+                                "REVERT_TO_ORIGINAL_TARGET",
+                                endpoint.getEndpointInfo().getName());
+            }
+            if (!endpoint.getEndpointInfo().getAddress().equals(originalAddress)) {
+                endpoint.getEndpointInfo().setAddress(originalAddress);
+                getLogger().log(Level.INFO,
+                                "REVERT_TO_ORIGINAL_ADDRESS",
+                                endpoint.getEndpointInfo().getAddress());
+            }
+            return originalEndpoint;
+        }
+        
+        BindingOperationInfo getBindingOperationInfo() {
+            return bindingOperationInfo;
+        }
+        
+        Object[] getParams() {
+            return params;
+        }
+        
+        Map<String, Object> getContext() {
+            return context;
+        }
+        
+        List<Endpoint> getAlternateEndpoints() {
+            return alternateEndpoints;
+        }
+
+        List<String> getAlternateAddresses() {
+            return alternateAddresses;
+        }
+
+        void setAlternateEndpoints(List<Endpoint> alternates) {
+            alternateEndpoints = alternates;
+        }
+
+        void setAlternateAddresses(List<String> alternates) {
+            alternateAddresses = alternates;
+        }
+
+        boolean hasAlternates() {
+            return !(alternateEndpoints == null && alternateAddresses == null);
+        }
+    }    
+}

Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java Fri Apr  6 16:21:14 2012
@@ -1,36 +1,36 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.clustering;
-
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-
-/**
- * This feature may be applied to a Client so as to enable
- * load distribution amongst a set of target endpoints or addresses
- * Note that this feature changes the conduit on the fly and thus makes
- * the Client not thread safe.
- */
-@NoJSR250Annotations
-public class LoadDistributorFeature extends FailoverFeature {
-
-    @Override
-    public FailoverTargetSelector getTargetSelector() {
-        return new LoadDistributorTargetSelector();
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.clustering;
+
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+
+/**
+ * This feature may be applied to a Client so as to enable
+ * load distribution amongst a set of target endpoints or addresses
+ * Note that this feature changes the conduit on the fly and thus makes
+ * the Client not thread safe.
+ */
+@NoJSR250Annotations
+public class LoadDistributorFeature extends FailoverFeature {
+
+    @Override
+    public FailoverTargetSelector getTargetSelector() {
+        return new LoadDistributorTargetSelector();
+    }
+}