You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/04/06 18:21:15 UTC
svn commit: r1310446 [2/4] - in /cxf/trunk:
rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/wsdl/extensions/
rt/core/src/main/java/org/apache/cxf/bus/extension/
rt/core/src/main/java/org/apache/cxf/bus/osgi/
rt/core/src/main/java/org/apache/...
Modified: cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java (original)
+++ cxf/trunk/rt/core/src/test/java/org/apache/cxf/transport/common/gzip/GZIPAcceptEncodingTest.java Fri Apr 6 16:21:14 2012
@@ -1,139 +1,139 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.common.gzip;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.cxf.helpers.HttpHeaderHelper;
-import org.apache.cxf.interceptor.Fault;
-import org.apache.cxf.interceptor.InterceptorChain;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.message.ExchangeImpl;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.message.MessageImpl;
-import org.easymock.EasyMock;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.FORCE;
-import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.YES;
-
-/**
- * Test for the parsing of Accept-Encoding by the GZIPOutInterceptor. For
- * Accept-Encoding values that enable gzip we expect an extra interceptor to be
- * added to the out message, and the {@link GZIPOutInterceptor#USE_GZIP_KEY} to
- * be set correctly. For Accept-Encoding values that do not enable gzip the
- * interceptor should not be added.
- */
-public class GZIPAcceptEncodingTest extends Assert {
-
- private GZIPOutInterceptor interceptor;
- private Message inMessage;
- private Message outMessage;
- private InterceptorChain outInterceptors;
-
- @Before
- public void setUp() throws Exception {
- interceptor = new GZIPOutInterceptor();
- inMessage = new MessageImpl();
- outMessage = new MessageImpl();
- Exchange exchange = new ExchangeImpl();
- exchange.setInMessage(inMessage);
- inMessage.setExchange(exchange);
- inMessage.setContent(InputStream.class, new ByteArrayInputStream(new byte[0]));
- exchange.setOutMessage(outMessage);
- outMessage.setExchange(exchange);
- outMessage.setContent(OutputStream.class, new ByteArrayOutputStream());
- outInterceptors = EasyMock.createMock(InterceptorChain.class);
- outMessage.setInterceptorChain(outInterceptors);
- }
-
- @Test
- public void testNoAcceptEncoding() throws Exception {
- EasyMock.replay(outInterceptors);
- interceptor.handleMessage(outMessage);
- }
-
- @Test
- public void testAcceptGzip() throws Exception {
- singleTest("gzip", true, YES, "gzip");
- }
-
- @Test
- public void testAcceptXGzip() throws Exception {
- singleTest("x-gzip, x-compress", true, YES, "x-gzip");
- }
-
- @Test
- public void testAcceptStar() throws Exception {
- singleTest("*", true, YES, "gzip");
- }
-
- @Test
- public void testAcceptOnlyGzip() throws Exception {
- singleTest("gzip, identity; q=0", true, FORCE, "gzip");
- }
-
- @Test
- public void testOnlyIdentitySupported() throws Exception {
- singleTest("deflate", false, null, null);
- }
-
- @Test
- public void testGzipExplicitlyDisabled() throws Exception {
- singleTest("gzip; q=0.00", false, null, null);
- }
-
- @Test(expected = Fault.class)
- public void testNoValidEncodings() throws Exception {
- EasyMock.replay();
- setAcceptEncoding("*;q=0, deflate;q=0.5");
- interceptor.handleMessage(outMessage);
- }
-
- private void singleTest(String encoding, boolean expectEndingInterceptor,
- GZIPOutInterceptor.UseGzip expectedUseGzip, String expectedGzipEncoding)
- throws Exception {
-
- EasyMock.replay(outInterceptors);
- setAcceptEncoding(encoding);
- interceptor.handleMessage(outMessage);
- assertSame("Wrong value of " + GZIPOutInterceptor.USE_GZIP_KEY, expectedUseGzip, outMessage
- .get(GZIPOutInterceptor.USE_GZIP_KEY));
- assertEquals("Wrong value of " + GZIPOutInterceptor.GZIP_ENCODING_KEY, expectedGzipEncoding,
- outMessage.get(GZIPOutInterceptor.GZIP_ENCODING_KEY));
- }
-
- private void setAcceptEncoding(String enc) {
- Map<String, List<String>> protocolHeaders
- = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
- protocolHeaders.put(HttpHeaderHelper.getHeaderKey(HttpHeaderHelper.ACCEPT_ENCODING), Collections
- .singletonList(enc));
- inMessage.put(Message.PROTOCOL_HEADERS, protocolHeaders);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.common.gzip;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.cxf.helpers.HttpHeaderHelper;
+import org.apache.cxf.interceptor.Fault;
+import org.apache.cxf.interceptor.InterceptorChain;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.easymock.EasyMock;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.FORCE;
+import static org.apache.cxf.transport.common.gzip.GZIPOutInterceptor.UseGzip.YES;
+
+/**
+ * Test for the parsing of Accept-Encoding by the GZIPOutInterceptor. For
+ * Accept-Encoding values that enable gzip we expect an extra interceptor to be
+ * added to the out message, and the {@link GZIPOutInterceptor#USE_GZIP_KEY} to
+ * be set correctly. For Accept-Encoding values that do not enable gzip the
+ * interceptor should not be added.
+ */
+public class GZIPAcceptEncodingTest extends Assert {
+
+ private GZIPOutInterceptor interceptor;
+ private Message inMessage;
+ private Message outMessage;
+ private InterceptorChain outInterceptors;
+
+ @Before
+ public void setUp() throws Exception {
+ interceptor = new GZIPOutInterceptor();
+ inMessage = new MessageImpl();
+ outMessage = new MessageImpl();
+ Exchange exchange = new ExchangeImpl();
+ exchange.setInMessage(inMessage);
+ inMessage.setExchange(exchange);
+ inMessage.setContent(InputStream.class, new ByteArrayInputStream(new byte[0]));
+ exchange.setOutMessage(outMessage);
+ outMessage.setExchange(exchange);
+ outMessage.setContent(OutputStream.class, new ByteArrayOutputStream());
+ outInterceptors = EasyMock.createMock(InterceptorChain.class);
+ outMessage.setInterceptorChain(outInterceptors);
+ }
+
+ @Test
+ public void testNoAcceptEncoding() throws Exception {
+ EasyMock.replay(outInterceptors);
+ interceptor.handleMessage(outMessage);
+ }
+
+ @Test
+ public void testAcceptGzip() throws Exception {
+ singleTest("gzip", true, YES, "gzip");
+ }
+
+ @Test
+ public void testAcceptXGzip() throws Exception {
+ singleTest("x-gzip, x-compress", true, YES, "x-gzip");
+ }
+
+ @Test
+ public void testAcceptStar() throws Exception {
+ singleTest("*", true, YES, "gzip");
+ }
+
+ @Test
+ public void testAcceptOnlyGzip() throws Exception {
+ singleTest("gzip, identity; q=0", true, FORCE, "gzip");
+ }
+
+ @Test
+ public void testOnlyIdentitySupported() throws Exception {
+ singleTest("deflate", false, null, null);
+ }
+
+ @Test
+ public void testGzipExplicitlyDisabled() throws Exception {
+ singleTest("gzip; q=0.00", false, null, null);
+ }
+
+ @Test(expected = Fault.class)
+ public void testNoValidEncodings() throws Exception {
+ EasyMock.replay();
+ setAcceptEncoding("*;q=0, deflate;q=0.5");
+ interceptor.handleMessage(outMessage);
+ }
+
+ private void singleTest(String encoding, boolean expectEndingInterceptor,
+ GZIPOutInterceptor.UseGzip expectedUseGzip, String expectedGzipEncoding)
+ throws Exception {
+
+ EasyMock.replay(outInterceptors);
+ setAcceptEncoding(encoding);
+ interceptor.handleMessage(outMessage);
+ assertSame("Wrong value of " + GZIPOutInterceptor.USE_GZIP_KEY, expectedUseGzip, outMessage
+ .get(GZIPOutInterceptor.USE_GZIP_KEY));
+ assertEquals("Wrong value of " + GZIPOutInterceptor.GZIP_ENCODING_KEY, expectedGzipEncoding,
+ outMessage.get(GZIPOutInterceptor.GZIP_ENCODING_KEY));
+ }
+
+ private void setAcceptEncoding(String enc) {
+ Map<String, List<String>> protocolHeaders
+ = new TreeMap<String, List<String>>(String.CASE_INSENSITIVE_ORDER);
+ protocolHeaders.put(HttpHeaderHelper.getHeaderKey(HttpHeaderHelper.ACCEPT_ENCODING), Collections
+ .singletonList(enc));
+ inMessage.put(Message.PROTOCOL_HEADERS, protocolHeaders);
+ }
+}
Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/AbstractStaticFailoverStrategy.java Fri Apr 6 16:21:14 2012
@@ -1,165 +1,165 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.clustering;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.xml.namespace.QName;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.service.model.ServiceInfo;
-
-/**
- * Failover strategy based on a static cluster represented by
- * multiple endpoints associated with the same service instance.
- */
-public abstract class AbstractStaticFailoverStrategy implements FailoverStrategy {
- private static final Logger LOG =
- LogUtils.getL7dLogger(AbstractStaticFailoverStrategy.class);
-
- private List<String> alternateAddresses;
- private long delayBetweenRetries;
-
- public void setDelayBetweenRetries(long delay) {
- this.delayBetweenRetries = delay;
- }
-
- public long getDelayBetweenRetries() {
- return this.delayBetweenRetries;
- }
-
- public void setAlternateAddresses(List<String> alternateAddresses) {
- this.alternateAddresses = alternateAddresses;
- }
-
- /**
- * Get the alternate addresses for this invocation.
- *
- * @param exchange the current Exchange
- * @return a List of alternate addresses if available
- */
- public List<String> getAlternateAddresses(Exchange exchange) {
- return alternateAddresses != null
- ? new ArrayList<String>(alternateAddresses)
- : null;
- }
-
- /**
- * Select one of the alternate addresses for a retried invocation.
- *
- * @param a List of alternate addresses if available
- * @return the selected address
- */
- public String selectAlternateAddress(List<String> alternates) {
- String selected = null;
- if (alternates != null && alternates.size() > 0) {
- selected = getNextAlternate(alternates);
- LOG.log(Level.WARNING,
- "FAILING_OVER_TO_ADDRESS_OVERRIDE",
- selected);
- } else {
- LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
- }
- return selected;
- }
-
- /**
- * Get the alternate endpoints for this invocation.
- *
- * @param exchange the current Exchange
- * @return a List of alternate endpoints if available
- */
- public List<Endpoint> getAlternateEndpoints(Exchange exchange) {
- return getEndpoints(exchange, false);
- }
-
- /**
- * Select one of the alternate endpoints for a retried invocation.
- *
- * @param a List of alternate endpoints if available
- * @return the selected endpoint
- */
- public Endpoint selectAlternateEndpoint(List<Endpoint> alternates) {
- Endpoint selected = null;
- if (alternates != null && alternates.size() > 0) {
- selected = getNextAlternate(alternates);
- LOG.log(Level.WARNING,
- "FAILING_OVER_TO_ALTERNATE_ENDPOINT",
- new Object[] {selected.getEndpointInfo().getName(),
- selected.getEndpointInfo().getAddress()});
- } else {
- LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
- }
- return selected;
- }
-
- /**
- * Get the endpoints for this invocation.
- *
- * @param exchange the current Exchange
- * @param acceptCandidatesWithSameAddress true to accept candidates with the same address
- * @return a List of alternate endpoints if available
- */
- protected List<Endpoint> getEndpoints(Exchange exchange, boolean acceptCandidatesWithSameAddress) {
- Endpoint endpoint = exchange.get(Endpoint.class);
- Collection<ServiceInfo> services = endpoint.getService().getServiceInfos();
- QName currentBinding = endpoint.getBinding().getBindingInfo().getName();
- List<Endpoint> alternates = new ArrayList<Endpoint>();
- for (ServiceInfo service : services) {
- Collection<EndpointInfo> candidates = service.getEndpoints();
- for (EndpointInfo candidate : candidates) {
- QName candidateBinding = candidate.getBinding().getName();
- if (candidateBinding.equals(currentBinding)) {
- if (acceptCandidatesWithSameAddress || !candidate.getAddress().equals(
- endpoint.getEndpointInfo().getAddress())) {
- Endpoint alternate =
- endpoint.getService().getEndpoints().get(candidate.getName());
- if (alternate != null) {
- LOG.log(Level.INFO,
- "FAILOVER_CANDIDATE_ACCEPTED",
- candidate.getName());
- alternates.add(alternate);
- }
- }
- } else {
- LOG.log(Level.INFO,
- "FAILOVER_CANDIDATE_REJECTED",
- new Object[] {candidate.getName(), candidateBinding});
- }
- }
- }
- return alternates;
- }
-
- /**
- * Get next alternate endpoint.
- *
- * @param alternates non-empty List of alternate endpoints
- * @return
- */
- protected abstract <T> T getNextAlternate(List<T> alternates);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.service.model.ServiceInfo;
+
+/**
+ * Failover strategy based on a static cluster represented by
+ * multiple endpoints associated with the same service instance.
+ */
+public abstract class AbstractStaticFailoverStrategy implements FailoverStrategy {
+ private static final Logger LOG =
+ LogUtils.getL7dLogger(AbstractStaticFailoverStrategy.class);
+
+ private List<String> alternateAddresses;
+ private long delayBetweenRetries;
+
+ public void setDelayBetweenRetries(long delay) {
+ this.delayBetweenRetries = delay;
+ }
+
+ public long getDelayBetweenRetries() {
+ return this.delayBetweenRetries;
+ }
+
+ public void setAlternateAddresses(List<String> alternateAddresses) {
+ this.alternateAddresses = alternateAddresses;
+ }
+
+ /**
+ * Get the alternate addresses for this invocation.
+ *
+ * @param exchange the current Exchange
+ * @return a List of alternate addresses if available
+ */
+ public List<String> getAlternateAddresses(Exchange exchange) {
+ return alternateAddresses != null
+ ? new ArrayList<String>(alternateAddresses)
+ : null;
+ }
+
+ /**
+ * Select one of the alternate addresses for a retried invocation.
+ *
+ * @param a List of alternate addresses if available
+ * @return the selected address
+ */
+ public String selectAlternateAddress(List<String> alternates) {
+ String selected = null;
+ if (alternates != null && alternates.size() > 0) {
+ selected = getNextAlternate(alternates);
+ LOG.log(Level.WARNING,
+ "FAILING_OVER_TO_ADDRESS_OVERRIDE",
+ selected);
+ } else {
+ LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
+ }
+ return selected;
+ }
+
+ /**
+ * Get the alternate endpoints for this invocation.
+ *
+ * @param exchange the current Exchange
+ * @return a List of alternate endpoints if available
+ */
+ public List<Endpoint> getAlternateEndpoints(Exchange exchange) {
+ return getEndpoints(exchange, false);
+ }
+
+ /**
+ * Select one of the alternate endpoints for a retried invocation.
+ *
+ * @param a List of alternate endpoints if available
+ * @return the selected endpoint
+ */
+ public Endpoint selectAlternateEndpoint(List<Endpoint> alternates) {
+ Endpoint selected = null;
+ if (alternates != null && alternates.size() > 0) {
+ selected = getNextAlternate(alternates);
+ LOG.log(Level.WARNING,
+ "FAILING_OVER_TO_ALTERNATE_ENDPOINT",
+ new Object[] {selected.getEndpointInfo().getName(),
+ selected.getEndpointInfo().getAddress()});
+ } else {
+ LOG.warning("NO_ALTERNATE_TARGETS_REMAIN");
+ }
+ return selected;
+ }
+
+ /**
+ * Get the endpoints for this invocation.
+ *
+ * @param exchange the current Exchange
+ * @param acceptCandidatesWithSameAddress true to accept candidates with the same address
+ * @return a List of alternate endpoints if available
+ */
+ protected List<Endpoint> getEndpoints(Exchange exchange, boolean acceptCandidatesWithSameAddress) {
+ Endpoint endpoint = exchange.get(Endpoint.class);
+ Collection<ServiceInfo> services = endpoint.getService().getServiceInfos();
+ QName currentBinding = endpoint.getBinding().getBindingInfo().getName();
+ List<Endpoint> alternates = new ArrayList<Endpoint>();
+ for (ServiceInfo service : services) {
+ Collection<EndpointInfo> candidates = service.getEndpoints();
+ for (EndpointInfo candidate : candidates) {
+ QName candidateBinding = candidate.getBinding().getName();
+ if (candidateBinding.equals(currentBinding)) {
+ if (acceptCandidatesWithSameAddress || !candidate.getAddress().equals(
+ endpoint.getEndpointInfo().getAddress())) {
+ Endpoint alternate =
+ endpoint.getService().getEndpoints().get(candidate.getName());
+ if (alternate != null) {
+ LOG.log(Level.INFO,
+ "FAILOVER_CANDIDATE_ACCEPTED",
+ candidate.getName());
+ alternates.add(alternate);
+ }
+ }
+ } else {
+ LOG.log(Level.INFO,
+ "FAILOVER_CANDIDATE_REJECTED",
+ new Object[] {candidate.getName(), candidateBinding});
+ }
+ }
+ }
+ return alternates;
+ }
+
+ /**
+ * Get next alternate endpoint.
+ *
+ * @param alternates non-empty List of alternate endpoints
+ * @return
+ */
+ protected abstract <T> T getNextAlternate(List<T> alternates);
+}
Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverFeature.java Fri Apr 6 16:21:14 2012
@@ -1,84 +1,84 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.clustering;
-
-import org.apache.cxf.Bus;
-import org.apache.cxf.annotations.EvaluateAllEndpoints;
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-import org.apache.cxf.endpoint.Client;
-import org.apache.cxf.endpoint.ConduitSelector;
-import org.apache.cxf.endpoint.ConduitSelectorHolder;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.feature.AbstractFeature;
-import org.apache.cxf.interceptor.InterceptorProvider;
-
-/**
- * This feature may be applied to a Client so as to enable
- * failover from the initial target endpoint to any other
- * compatible endpoint for the target service.
- */
-@NoJSR250Annotations
-@EvaluateAllEndpoints
-public class FailoverFeature extends AbstractFeature {
-
- private FailoverStrategy failoverStrategy;
- private FailoverTargetSelector targetSelector;
-
- @Override
- protected void initializeProvider(InterceptorProvider provider, Bus bus) {
- if (provider instanceof ConduitSelectorHolder) {
- ConduitSelectorHolder csHolder = (ConduitSelectorHolder) provider;
- Endpoint endpoint = csHolder.getConduitSelector().getEndpoint();
- ConduitSelector conduitSelector = initTargetSelector(endpoint);
- csHolder.setConduitSelector(conduitSelector);
- }
- }
-
- @Override
- public void initialize(Client client, Bus bus) {
- ConduitSelector selector = initTargetSelector(client.getConduitSelector().getEndpoint());
- client.setConduitSelector(selector);
- }
-
- protected ConduitSelector initTargetSelector(Endpoint endpoint) {
- FailoverTargetSelector selector = getTargetSelector();
- selector.setEndpoint(endpoint);
- selector.setStrategy(getStrategy());
- return selector;
- }
-
- public FailoverTargetSelector getTargetSelector() {
- if (this.targetSelector == null) {
- this.targetSelector = new FailoverTargetSelector();
- }
- return this.targetSelector;
- }
-
- public void setTargetSelector(FailoverTargetSelector selector) {
- this.targetSelector = selector;
- }
-
- public void setStrategy(FailoverStrategy strategy) {
- failoverStrategy = strategy;
- }
-
- public FailoverStrategy getStrategy() {
- return failoverStrategy;
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.clustering;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.annotations.EvaluateAllEndpoints;
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ConduitSelector;
+import org.apache.cxf.endpoint.ConduitSelectorHolder;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.feature.AbstractFeature;
+import org.apache.cxf.interceptor.InterceptorProvider;
+
+/**
+ * This feature may be applied to a Client so as to enable
+ * failover from the initial target endpoint to any other
+ * compatible endpoint for the target service.
+ */
+@NoJSR250Annotations
+@EvaluateAllEndpoints
+public class FailoverFeature extends AbstractFeature {
+
+ private FailoverStrategy failoverStrategy;
+ private FailoverTargetSelector targetSelector;
+
+ @Override
+ protected void initializeProvider(InterceptorProvider provider, Bus bus) {
+ if (provider instanceof ConduitSelectorHolder) {
+ ConduitSelectorHolder csHolder = (ConduitSelectorHolder) provider;
+ Endpoint endpoint = csHolder.getConduitSelector().getEndpoint();
+ ConduitSelector conduitSelector = initTargetSelector(endpoint);
+ csHolder.setConduitSelector(conduitSelector);
+ }
+ }
+
+ @Override
+ public void initialize(Client client, Bus bus) {
+ ConduitSelector selector = initTargetSelector(client.getConduitSelector().getEndpoint());
+ client.setConduitSelector(selector);
+ }
+
+ protected ConduitSelector initTargetSelector(Endpoint endpoint) {
+ FailoverTargetSelector selector = getTargetSelector();
+ selector.setEndpoint(endpoint);
+ selector.setStrategy(getStrategy());
+ return selector;
+ }
+
+ public FailoverTargetSelector getTargetSelector() {
+ if (this.targetSelector == null) {
+ this.targetSelector = new FailoverTargetSelector();
+ }
+ return this.targetSelector;
+ }
+
+ public void setTargetSelector(FailoverTargetSelector selector) {
+ this.targetSelector = selector;
+ }
+
+ public void setStrategy(FailoverStrategy strategy) {
+ failoverStrategy = strategy;
+ }
+
+ public FailoverStrategy getStrategy() {
+ return failoverStrategy;
+ }
+}
Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverStrategy.java Fri Apr 6 16:21:14 2012
@@ -1,64 +1,64 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.clustering;
-
-import java.util.List;
-
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.message.Exchange;
-
-/**
- * Supports pluggable strategies for alternate endpoint selection on
- * failover.
- */
-public interface FailoverStrategy {
- /**
- * Get the alternate endpoints for this invocation.
- *
- * @param exchange the current Exchange
- * @return a failover endpoint if one is available
- */
- List<Endpoint> getAlternateEndpoints(Exchange exchange);
-
- /**
- * Select one of the alternate endpoints for a retried invocation.
- *
- * @param alternates List of alternate endpoints if available
- * @return the selected endpoint
- */
- Endpoint selectAlternateEndpoint(List<Endpoint> alternates);
-
- /**
- * Get the alternate addresses for this invocation.
- * These addresses over-ride any addresses specified in the WSDL.
- *
- * @param exchange the current Exchange
- * @return a failover endpoint if one is available
- */
- List<String> getAlternateAddresses(Exchange exchange);
-
- /**
- * Select one of the alternate addresses for a retried invocation.
- *
- * @param addresses List of alternate addresses if available
- * @return the selected address
- */
- String selectAlternateAddress(List<String> addresses);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering;
+
+import java.util.List;
+
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.message.Exchange;
+
+/**
+ * Supports pluggable strategies for alternate endpoint selection on
+ * failover.
+ */
+public interface FailoverStrategy {
+ /**
+ * Get the alternate endpoints for this invocation.
+ *
+ * @param exchange the current Exchange
+ * @return a failover endpoint if one is available
+ */
+ List<Endpoint> getAlternateEndpoints(Exchange exchange);
+
+ /**
+ * Select one of the alternate endpoints for a retried invocation.
+ *
+ * @param alternates List of alternate endpoints if available
+ * @return the selected endpoint
+ */
+ Endpoint selectAlternateEndpoint(List<Endpoint> alternates);
+
+ /**
+ * Get the alternate addresses for this invocation.
+ * These addresses over-ride any addresses specified in the WSDL.
+ *
+ * @param exchange the current Exchange
+ * @return a failover endpoint if one is available
+ */
+ List<String> getAlternateAddresses(Exchange exchange);
+
+ /**
+ * Select one of the alternate addresses for a retried invocation.
+ *
+ * @param addresses List of alternate addresses if available
+ * @return the selected address
+ */
+ String selectAlternateAddress(List<String> addresses);
+}
Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/FailoverTargetSelector.java Fri Apr 6 16:21:14 2012
@@ -1,436 +1,436 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cxf.clustering;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.endpoint.AbstractConduitSelector;
-import org.apache.cxf.endpoint.Client;
-import org.apache.cxf.endpoint.Endpoint;
-import org.apache.cxf.endpoint.Retryable;
-import org.apache.cxf.helpers.CastUtils;
-import org.apache.cxf.message.Exchange;
-import org.apache.cxf.message.Message;
-import org.apache.cxf.service.model.BindingOperationInfo;
-import org.apache.cxf.transport.Conduit;
-
-
-/**
- * Implements a target selection strategy based on failover to an
- * alternate target endpoint when a transport level failure is
- * encountered.
- * Note that this feature changes the conduit on the fly and thus makes
- * the Client not thread safe.
- */
-public class FailoverTargetSelector extends AbstractConduitSelector {
-
- private static final Logger LOG =
- LogUtils.getL7dLogger(FailoverTargetSelector.class);
- protected Map<InvocationKey, InvocationContext> inProgress
- = new ConcurrentHashMap<InvocationKey, InvocationContext>();;
- protected FailoverStrategy failoverStrategy;
-
- /**
- * Normal constructor.
- */
- public FailoverTargetSelector() {
- super();
- }
-
- /**
- * Constructor, allowing a specific conduit to override normal selection.
- *
- * @param c specific conduit
- */
- public FailoverTargetSelector(Conduit c) {
- super(c);
- }
-
- /**
- * Called prior to the interceptor chain being traversed.
- *
- * @param message the current Message
- */
- public synchronized void prepare(Message message) {
- Exchange exchange = message.getExchange();
- InvocationKey key = new InvocationKey(exchange);
- if (!inProgress.containsKey(key)) {
- Endpoint endpoint = exchange.get(Endpoint.class);
- BindingOperationInfo bindingOperationInfo =
- exchange.getBindingOperationInfo();
- Object[] params = message.getContent(List.class).toArray();
- Map<String, Object> context =
- CastUtils.cast((Map<?, ?>)message.get(Message.INVOCATION_CONTEXT));
- InvocationContext invocation =
- new InvocationContext(endpoint,
- bindingOperationInfo,
- params,
- context);
- inProgress.put(key, invocation);
- }
- }
-
- /**
- * Called when a Conduit is actually required.
- *
- * @param message
- * @return the Conduit to use for mediation of the message
- */
- public Conduit selectConduit(Message message) {
- Conduit c = message.get(Conduit.class);
- if (c != null) {
- return c;
- }
- return getSelectedConduit(message);
- }
-
- /**
- * Called on completion of the MEP for which the Conduit was required.
- *
- * @param exchange represents the completed MEP
- */
- public void complete(Exchange exchange) {
- InvocationKey key = new InvocationKey(exchange);
- InvocationContext invocation = null;
- synchronized (this) {
- invocation = inProgress.get(key);
- }
- boolean failover = false;
- if (requiresFailover(exchange)) {
- Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName());
-
- Endpoint failoverTarget = getFailoverTarget(exchange, invocation);
- if (failoverTarget != null) {
- setEndpoint(failoverTarget);
- if (old != null) {
- old.close();
- conduits.remove(old);
- }
- Exception prevExchangeFault =
- (Exception)exchange.remove(Exception.class.getName());
- Message outMessage = exchange.getOutMessage();
- Exception prevMessageFault =
- outMessage.getContent(Exception.class);
- outMessage.setContent(Exception.class, null);
- overrideAddressProperty(invocation.getContext());
- Retryable retry = exchange.get(Retryable.class);
- exchange.clear();
- if (retry != null) {
- try {
- failover = true;
- long delay = getDelayBetweenRetries();
- if (delay > 0) {
- Thread.sleep(delay);
- }
- retry.invoke(invocation.getBindingOperationInfo(),
- invocation.getParams(),
- invocation.getContext(),
- exchange);
- } catch (Exception e) {
- if (exchange.get(Exception.class) != null) {
- exchange.put(Exception.class, prevExchangeFault);
- }
- if (outMessage.getContent(Exception.class) != null) {
- outMessage.setContent(Exception.class,
- prevMessageFault);
- }
- }
- }
- } else {
- setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
- }
- }
- if (!failover) {
- getLogger().fine("FAILOVER_NOT_REQUIRED");
- synchronized (this) {
- inProgress.remove(key);
- }
- super.complete(exchange);
- }
- }
-
- /**
- * @param strategy the FailoverStrategy to use
- */
- public synchronized void setStrategy(FailoverStrategy strategy) {
- getLogger().log(Level.INFO, "USING_STRATEGY", new Object[] {strategy});
- failoverStrategy = strategy;
- }
-
- /**
- * @return strategy the FailoverStrategy to use
- */
- public synchronized FailoverStrategy getStrategy() {
- if (failoverStrategy == null) {
- failoverStrategy = new SequentialStrategy();
- getLogger().log(Level.INFO,
- "USING_STRATEGY",
- new Object[] {failoverStrategy});
- }
- return failoverStrategy;
- }
-
- /**
- * @return the logger to use
- */
- protected Logger getLogger() {
- return LOG;
- }
-
- /**
- * Returns delay (in milliseconds) between retries
- * @return delay, 0 means no delay
- */
- protected long getDelayBetweenRetries() {
- FailoverStrategy strategy = getStrategy();
- if (strategy instanceof AbstractStaticFailoverStrategy) {
- return ((AbstractStaticFailoverStrategy)strategy).getDelayBetweenRetries();
- }
- //perhaps supporting FailoverTargetSelector specific property can make sense too
- return 0;
- }
-
- /**
- * Check if the exchange is suitable for a failover.
- *
- * @param exchange the current Exchange
- * @return boolean true if a failover should be attempted
- */
- protected boolean requiresFailover(Exchange exchange) {
- Message outMessage = exchange.getOutMessage();
- Exception ex = outMessage.get(Exception.class) != null
- ? outMessage.get(Exception.class)
- : exchange.get(Exception.class);
- getLogger().log(Level.FINE,
- "CHECK_LAST_INVOKE_FAILED",
- new Object[] {ex != null});
- Throwable curr = ex;
- boolean failover = false;
- while (curr != null) {
- failover = curr instanceof java.io.IOException;
- curr = curr.getCause();
- }
- if (ex != null) {
- getLogger().log(Level.INFO,
- "CHECK_FAILURE_IN_TRANSPORT",
- new Object[] {ex, failover});
- }
- return failover;
- }
-
- /**
- * Get the failover target endpoint, if a suitable one is available.
- *
- * @param exchange the current Exchange
- * @param invocation the current InvocationContext
- * @return a failover endpoint if one is available
- */
- protected Endpoint getFailoverTarget(Exchange exchange,
- InvocationContext invocation) {
- List<String> alternateAddresses = null;
- if (!invocation.hasAlternates()) {
- // no previous failover attempt on this invocation
- //
- alternateAddresses =
- getStrategy().getAlternateAddresses(exchange);
- if (alternateAddresses != null) {
- invocation.setAlternateAddresses(alternateAddresses);
- } else {
- invocation.setAlternateEndpoints(
- getStrategy().getAlternateEndpoints(exchange));
- }
- } else {
- alternateAddresses = invocation.getAlternateAddresses();
- }
-
- Endpoint failoverTarget = null;
- if (alternateAddresses != null) {
- String alternateAddress =
- getStrategy().selectAlternateAddress(alternateAddresses);
- if (alternateAddress != null) {
- // re-use current endpoint
- //
- failoverTarget = getEndpoint();
-
- failoverTarget.getEndpointInfo().setAddress(alternateAddress);
- }
- } else {
- failoverTarget = getStrategy().selectAlternateEndpoint(
- invocation.getAlternateEndpoints());
- }
- return failoverTarget;
- }
-
- /**
- * Override the ENDPOINT_ADDRESS property in the request context
- *
- * @param context the request context
- */
- protected void overrideAddressProperty(Map<String, Object> context) {
- overrideAddressProperty(context, getEndpoint().getEndpointInfo().getAddress());
- }
-
- protected void overrideAddressProperty(Map<String, Object> context,
- String address) {
- Map<String, Object> requestContext =
- CastUtils.cast((Map<?, ?>)context.get(Client.REQUEST_CONTEXT));
- if (requestContext != null) {
- requestContext.put(Message.ENDPOINT_ADDRESS, address);
- requestContext.put("javax.xml.ws.service.endpoint.address", address);
- }
- }
-
- // Some conduits may replace the endpoint address after it has already been prepared
- // but before the invocation has been done (ex, org.apache.cxf.clustering.LoadDistributorTargetSelector)
- // which may affect JAX-RS clients where actual endpoint address property may include additional path
- // segments.
- protected boolean replaceEndpointAddressPropertyIfNeeded(Message message,
- String endpointAddress,
- Conduit cond) {
- String requestURI = (String)message.get(Message.REQUEST_URI);
- if (requestURI != null && endpointAddress != null && !requestURI.startsWith(endpointAddress)) {
- String basePath = (String)message.get(Message.BASE_PATH);
- if (basePath != null && requestURI.startsWith(basePath)) {
- String pathInfo = requestURI.substring(basePath.length());
- message.put(Message.BASE_PATH, endpointAddress);
- final String slash = "/";
- boolean startsWithSlash = pathInfo.startsWith(slash);
- if (endpointAddress.endsWith(slash)) {
- endpointAddress = endpointAddress + (startsWithSlash ? pathInfo.substring(1) : pathInfo);
- } else {
- endpointAddress = endpointAddress + (startsWithSlash ? pathInfo : (slash + pathInfo));
- }
- message.put(Message.ENDPOINT_ADDRESS, endpointAddress);
-
- Exchange exchange = message.getExchange();
- InvocationKey key = new InvocationKey(exchange);
- InvocationContext invocation = inProgress.get(key);
- if (invocation != null) {
- overrideAddressProperty(invocation.getContext(),
- cond.getTarget().getAddress().getValue());
- }
- return true;
- }
- }
- return false;
- }
-
- /**
- * Used to wrap an Exchange for usage as a Map key. The raw Exchange
- * is not a suitable key type, as the hashCode is computed from its
- * current contents, which may obviously change over the lifetime of
- * an invocation.
- */
- protected static class InvocationKey {
- private Exchange exchange;
-
- InvocationKey(Exchange ex) {
- exchange = ex;
- }
-
- @Override
- public int hashCode() {
- return System.identityHashCode(exchange);
- }
-
- @Override
- public boolean equals(Object o) {
- return o instanceof InvocationKey
- && exchange == ((InvocationKey)o).exchange;
- }
- }
-
-
- /**
- * Records the context of an invocation.
- */
- protected class InvocationContext {
- private Endpoint originalEndpoint;
- private String originalAddress;
- private BindingOperationInfo bindingOperationInfo;
- private Object[] params;
- private Map<String, Object> context;
- private List<Endpoint> alternateEndpoints;
- private List<String> alternateAddresses;
-
- InvocationContext(Endpoint endpoint,
- BindingOperationInfo boi,
- Object[] prms,
- Map<String, Object> ctx) {
- originalEndpoint = endpoint;
- originalAddress = endpoint.getEndpointInfo().getAddress();
- bindingOperationInfo = boi;
- params = prms;
- context = ctx;
- }
-
- Endpoint retrieveOriginalEndpoint(Endpoint endpoint) {
- if (endpoint != originalEndpoint) {
- getLogger().log(Level.INFO,
- "REVERT_TO_ORIGINAL_TARGET",
- endpoint.getEndpointInfo().getName());
- }
- if (!endpoint.getEndpointInfo().getAddress().equals(originalAddress)) {
- endpoint.getEndpointInfo().setAddress(originalAddress);
- getLogger().log(Level.INFO,
- "REVERT_TO_ORIGINAL_ADDRESS",
- endpoint.getEndpointInfo().getAddress());
- }
- return originalEndpoint;
- }
-
- BindingOperationInfo getBindingOperationInfo() {
- return bindingOperationInfo;
- }
-
- Object[] getParams() {
- return params;
- }
-
- Map<String, Object> getContext() {
- return context;
- }
-
- List<Endpoint> getAlternateEndpoints() {
- return alternateEndpoints;
- }
-
- List<String> getAlternateAddresses() {
- return alternateAddresses;
- }
-
- void setAlternateEndpoints(List<Endpoint> alternates) {
- alternateEndpoints = alternates;
- }
-
- void setAlternateAddresses(List<String> alternates) {
- alternateAddresses = alternates;
- }
-
- boolean hasAlternates() {
- return !(alternateEndpoints == null && alternateAddresses == null);
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.clustering;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.endpoint.AbstractConduitSelector;
+import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.Endpoint;
+import org.apache.cxf.endpoint.Retryable;
+import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.service.model.BindingOperationInfo;
+import org.apache.cxf.transport.Conduit;
+
+
+/**
+ * Implements a target selection strategy based on failover to an
+ * alternate target endpoint when a transport level failure is
+ * encountered.
+ * Note that this feature changes the conduit on the fly and thus makes
+ * the Client not thread safe.
+ */
+public class FailoverTargetSelector extends AbstractConduitSelector {
+
+ private static final Logger LOG =
+ LogUtils.getL7dLogger(FailoverTargetSelector.class);
+ protected Map<InvocationKey, InvocationContext> inProgress
+ = new ConcurrentHashMap<InvocationKey, InvocationContext>();;
+ protected FailoverStrategy failoverStrategy;
+
+ /**
+ * Normal constructor.
+ */
+ public FailoverTargetSelector() {
+ super();
+ }
+
+ /**
+ * Constructor, allowing a specific conduit to override normal selection.
+ *
+ * @param c specific conduit
+ */
+ public FailoverTargetSelector(Conduit c) {
+ super(c);
+ }
+
+ /**
+ * Called prior to the interceptor chain being traversed.
+ *
+ * @param message the current Message
+ */
+ public synchronized void prepare(Message message) {
+ Exchange exchange = message.getExchange();
+ InvocationKey key = new InvocationKey(exchange);
+ if (!inProgress.containsKey(key)) {
+ Endpoint endpoint = exchange.get(Endpoint.class);
+ BindingOperationInfo bindingOperationInfo =
+ exchange.getBindingOperationInfo();
+ Object[] params = message.getContent(List.class).toArray();
+ Map<String, Object> context =
+ CastUtils.cast((Map<?, ?>)message.get(Message.INVOCATION_CONTEXT));
+ InvocationContext invocation =
+ new InvocationContext(endpoint,
+ bindingOperationInfo,
+ params,
+ context);
+ inProgress.put(key, invocation);
+ }
+ }
+
+ /**
+ * Called when a Conduit is actually required.
+ *
+ * @param message
+ * @return the Conduit to use for mediation of the message
+ */
+ public Conduit selectConduit(Message message) {
+ Conduit c = message.get(Conduit.class);
+ if (c != null) {
+ return c;
+ }
+ return getSelectedConduit(message);
+ }
+
+ /**
+ * Called on completion of the MEP for which the Conduit was required.
+ *
+ * @param exchange represents the completed MEP
+ */
+ public void complete(Exchange exchange) {
+ InvocationKey key = new InvocationKey(exchange);
+ InvocationContext invocation = null;
+ synchronized (this) {
+ invocation = inProgress.get(key);
+ }
+ boolean failover = false;
+ if (requiresFailover(exchange)) {
+ Conduit old = (Conduit)exchange.getOutMessage().remove(Conduit.class.getName());
+
+ Endpoint failoverTarget = getFailoverTarget(exchange, invocation);
+ if (failoverTarget != null) {
+ setEndpoint(failoverTarget);
+ if (old != null) {
+ old.close();
+ conduits.remove(old);
+ }
+ Exception prevExchangeFault =
+ (Exception)exchange.remove(Exception.class.getName());
+ Message outMessage = exchange.getOutMessage();
+ Exception prevMessageFault =
+ outMessage.getContent(Exception.class);
+ outMessage.setContent(Exception.class, null);
+ overrideAddressProperty(invocation.getContext());
+ Retryable retry = exchange.get(Retryable.class);
+ exchange.clear();
+ if (retry != null) {
+ try {
+ failover = true;
+ long delay = getDelayBetweenRetries();
+ if (delay > 0) {
+ Thread.sleep(delay);
+ }
+ retry.invoke(invocation.getBindingOperationInfo(),
+ invocation.getParams(),
+ invocation.getContext(),
+ exchange);
+ } catch (Exception e) {
+ if (exchange.get(Exception.class) != null) {
+ exchange.put(Exception.class, prevExchangeFault);
+ }
+ if (outMessage.getContent(Exception.class) != null) {
+ outMessage.setContent(Exception.class,
+ prevMessageFault);
+ }
+ }
+ }
+ } else {
+ setEndpoint(invocation.retrieveOriginalEndpoint(endpoint));
+ }
+ }
+ if (!failover) {
+ getLogger().fine("FAILOVER_NOT_REQUIRED");
+ synchronized (this) {
+ inProgress.remove(key);
+ }
+ super.complete(exchange);
+ }
+ }
+
+ /**
+ * @param strategy the FailoverStrategy to use
+ */
+ public synchronized void setStrategy(FailoverStrategy strategy) {
+ getLogger().log(Level.INFO, "USING_STRATEGY", new Object[] {strategy});
+ failoverStrategy = strategy;
+ }
+
+ /**
+ * @return strategy the FailoverStrategy to use
+ */
+ public synchronized FailoverStrategy getStrategy() {
+ if (failoverStrategy == null) {
+ failoverStrategy = new SequentialStrategy();
+ getLogger().log(Level.INFO,
+ "USING_STRATEGY",
+ new Object[] {failoverStrategy});
+ }
+ return failoverStrategy;
+ }
+
+ /**
+ * @return the logger to use
+ */
+ protected Logger getLogger() {
+ return LOG;
+ }
+
+ /**
+ * Returns delay (in milliseconds) between retries
+ * @return delay, 0 means no delay
+ */
+ protected long getDelayBetweenRetries() {
+ FailoverStrategy strategy = getStrategy();
+ if (strategy instanceof AbstractStaticFailoverStrategy) {
+ return ((AbstractStaticFailoverStrategy)strategy).getDelayBetweenRetries();
+ }
+ //perhaps supporting FailoverTargetSelector specific property can make sense too
+ return 0;
+ }
+
+ /**
+ * Check if the exchange is suitable for a failover.
+ *
+ * @param exchange the current Exchange
+ * @return boolean true if a failover should be attempted
+ */
+ protected boolean requiresFailover(Exchange exchange) {
+ Message outMessage = exchange.getOutMessage();
+ Exception ex = outMessage.get(Exception.class) != null
+ ? outMessage.get(Exception.class)
+ : exchange.get(Exception.class);
+ getLogger().log(Level.FINE,
+ "CHECK_LAST_INVOKE_FAILED",
+ new Object[] {ex != null});
+ Throwable curr = ex;
+ boolean failover = false;
+ while (curr != null) {
+ failover = curr instanceof java.io.IOException;
+ curr = curr.getCause();
+ }
+ if (ex != null) {
+ getLogger().log(Level.INFO,
+ "CHECK_FAILURE_IN_TRANSPORT",
+ new Object[] {ex, failover});
+ }
+ return failover;
+ }
+
+ /**
+ * Get the failover target endpoint, if a suitable one is available.
+ *
+ * @param exchange the current Exchange
+ * @param invocation the current InvocationContext
+ * @return a failover endpoint if one is available
+ */
+ protected Endpoint getFailoverTarget(Exchange exchange,
+ InvocationContext invocation) {
+ List<String> alternateAddresses = null;
+ if (!invocation.hasAlternates()) {
+ // no previous failover attempt on this invocation
+ //
+ alternateAddresses =
+ getStrategy().getAlternateAddresses(exchange);
+ if (alternateAddresses != null) {
+ invocation.setAlternateAddresses(alternateAddresses);
+ } else {
+ invocation.setAlternateEndpoints(
+ getStrategy().getAlternateEndpoints(exchange));
+ }
+ } else {
+ alternateAddresses = invocation.getAlternateAddresses();
+ }
+
+ Endpoint failoverTarget = null;
+ if (alternateAddresses != null) {
+ String alternateAddress =
+ getStrategy().selectAlternateAddress(alternateAddresses);
+ if (alternateAddress != null) {
+ // re-use current endpoint
+ //
+ failoverTarget = getEndpoint();
+
+ failoverTarget.getEndpointInfo().setAddress(alternateAddress);
+ }
+ } else {
+ failoverTarget = getStrategy().selectAlternateEndpoint(
+ invocation.getAlternateEndpoints());
+ }
+ return failoverTarget;
+ }
+
+ /**
+ * Override the ENDPOINT_ADDRESS property in the request context
+ *
+ * @param context the request context
+ */
+ protected void overrideAddressProperty(Map<String, Object> context) {
+ overrideAddressProperty(context, getEndpoint().getEndpointInfo().getAddress());
+ }
+
+ protected void overrideAddressProperty(Map<String, Object> context,
+ String address) {
+ Map<String, Object> requestContext =
+ CastUtils.cast((Map<?, ?>)context.get(Client.REQUEST_CONTEXT));
+ if (requestContext != null) {
+ requestContext.put(Message.ENDPOINT_ADDRESS, address);
+ requestContext.put("javax.xml.ws.service.endpoint.address", address);
+ }
+ }
+
+ // Some conduits may replace the endpoint address after it has already been prepared
+ // but before the invocation has been done (ex, org.apache.cxf.clustering.LoadDistributorTargetSelector)
+ // which may affect JAX-RS clients where actual endpoint address property may include additional path
+ // segments.
+ protected boolean replaceEndpointAddressPropertyIfNeeded(Message message,
+ String endpointAddress,
+ Conduit cond) {
+ String requestURI = (String)message.get(Message.REQUEST_URI);
+ if (requestURI != null && endpointAddress != null && !requestURI.startsWith(endpointAddress)) {
+ String basePath = (String)message.get(Message.BASE_PATH);
+ if (basePath != null && requestURI.startsWith(basePath)) {
+ String pathInfo = requestURI.substring(basePath.length());
+ message.put(Message.BASE_PATH, endpointAddress);
+ final String slash = "/";
+ boolean startsWithSlash = pathInfo.startsWith(slash);
+ if (endpointAddress.endsWith(slash)) {
+ endpointAddress = endpointAddress + (startsWithSlash ? pathInfo.substring(1) : pathInfo);
+ } else {
+ endpointAddress = endpointAddress + (startsWithSlash ? pathInfo : (slash + pathInfo));
+ }
+ message.put(Message.ENDPOINT_ADDRESS, endpointAddress);
+
+ Exchange exchange = message.getExchange();
+ InvocationKey key = new InvocationKey(exchange);
+ InvocationContext invocation = inProgress.get(key);
+ if (invocation != null) {
+ overrideAddressProperty(invocation.getContext(),
+ cond.getTarget().getAddress().getValue());
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Used to wrap an Exchange for usage as a Map key. The raw Exchange
+ * is not a suitable key type, as the hashCode is computed from its
+ * current contents, which may obviously change over the lifetime of
+ * an invocation.
+ */
+ protected static class InvocationKey {
+ private Exchange exchange;
+
+ InvocationKey(Exchange ex) {
+ exchange = ex;
+ }
+
+ @Override
+ public int hashCode() {
+ return System.identityHashCode(exchange);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof InvocationKey
+ && exchange == ((InvocationKey)o).exchange;
+ }
+ }
+
+
+ /**
+ * Records the context of an invocation.
+ */
+ protected class InvocationContext {
+ private Endpoint originalEndpoint;
+ private String originalAddress;
+ private BindingOperationInfo bindingOperationInfo;
+ private Object[] params;
+ private Map<String, Object> context;
+ private List<Endpoint> alternateEndpoints;
+ private List<String> alternateAddresses;
+
+ InvocationContext(Endpoint endpoint,
+ BindingOperationInfo boi,
+ Object[] prms,
+ Map<String, Object> ctx) {
+ originalEndpoint = endpoint;
+ originalAddress = endpoint.getEndpointInfo().getAddress();
+ bindingOperationInfo = boi;
+ params = prms;
+ context = ctx;
+ }
+
+ Endpoint retrieveOriginalEndpoint(Endpoint endpoint) {
+ if (endpoint != originalEndpoint) {
+ getLogger().log(Level.INFO,
+ "REVERT_TO_ORIGINAL_TARGET",
+ endpoint.getEndpointInfo().getName());
+ }
+ if (!endpoint.getEndpointInfo().getAddress().equals(originalAddress)) {
+ endpoint.getEndpointInfo().setAddress(originalAddress);
+ getLogger().log(Level.INFO,
+ "REVERT_TO_ORIGINAL_ADDRESS",
+ endpoint.getEndpointInfo().getAddress());
+ }
+ return originalEndpoint;
+ }
+
+ BindingOperationInfo getBindingOperationInfo() {
+ return bindingOperationInfo;
+ }
+
+ Object[] getParams() {
+ return params;
+ }
+
+ Map<String, Object> getContext() {
+ return context;
+ }
+
+ List<Endpoint> getAlternateEndpoints() {
+ return alternateEndpoints;
+ }
+
+ List<String> getAlternateAddresses() {
+ return alternateAddresses;
+ }
+
+ void setAlternateEndpoints(List<Endpoint> alternates) {
+ alternateEndpoints = alternates;
+ }
+
+ void setAlternateAddresses(List<String> alternates) {
+ alternateAddresses = alternates;
+ }
+
+ boolean hasAlternates() {
+ return !(alternateEndpoints == null && alternateAddresses == null);
+ }
+ }
+}
Modified: cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java?rev=1310446&r1=1310445&r2=1310446&view=diff
==============================================================================
--- cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java (original)
+++ cxf/trunk/rt/features/clustering/src/main/java/org/apache/cxf/clustering/LoadDistributorFeature.java Fri Apr 6 16:21:14 2012
@@ -1,36 +1,36 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.clustering;
-
-import org.apache.cxf.common.injection.NoJSR250Annotations;
-
-/**
- * This feature may be applied to a Client so as to enable
- * load distribution amongst a set of target endpoints or addresses
- * Note that this feature changes the conduit on the fly and thus makes
- * the Client not thread safe.
- */
-@NoJSR250Annotations
-public class LoadDistributorFeature extends FailoverFeature {
-
- @Override
- public FailoverTargetSelector getTargetSelector() {
- return new LoadDistributorTargetSelector();
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.clustering;
+
+import org.apache.cxf.common.injection.NoJSR250Annotations;
+
+/**
+ * This feature may be applied to a Client so as to enable
+ * load distribution amongst a set of target endpoints or addresses
+ * Note that this feature changes the conduit on the fly and thus makes
+ * the Client not thread safe.
+ */
+@NoJSR250Annotations
+public class LoadDistributorFeature extends FailoverFeature {
+
+ @Override
+ public FailoverTargetSelector getTargetSelector() {
+ return new LoadDistributorTargetSelector();
+ }
+}