You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2017/04/28 15:56:14 UTC
camel git commit: CAMEL-11212 Don't allow Salesforce HTTP client ...
Repository: camel
Updated Branches:
refs/heads/master 626ef3203 -> 3505e718d
CAMEL-11212 Don't allow Salesforce HTTP client ...
...to stop with outstanding requests
This introduces a `Phaser` to await for any in-flight requests to
complete before the client is allowed to stop.
This should not block as long as the
`BufferingResponseListener::onComplete` is invoked at the end of every
request. And that is guaranteed by the `CompleteListener::onComplete`
contract.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3505e718
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3505e718
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3505e718
Branch: refs/heads/master
Commit: 3505e718db48cc0d8be5b47f6c4030c958d93a5f
Parents: 626ef32
Author: Zoran Regvart <zr...@apache.org>
Authored: Thu Apr 27 14:40:18 2017 +0200
Committer: Zoran Regvart <zr...@apache.org>
Committed: Fri Apr 28 17:55:54 2017 +0200
----------------------------------------------------------------------
.../internal/client/AbstractClientBase.java | 110 +++++++++++------
.../internal/client/AbstractClientBaseTest.java | 121 +++++++++++++++++++
2 files changed, 191 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3505e718/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
index af96dcd..26a86d9 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -21,7 +21,9 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
@@ -55,6 +57,8 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
protected static final String APPLICATION_JSON_UTF8 = "application/json;charset=utf-8";
protected static final String APPLICATION_XML_UTF8 = "application/xml;charset=utf-8";
+ private static final int DEFAULT_TERMINATION_TIMEOUT = 10;
+
protected final Logger log = LoggerFactory.getLogger(getClass());
protected final SalesforceHttpClient httpClient;
@@ -64,12 +68,22 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
protected String accessToken;
protected String instanceUrl;
+ private Phaser inflightRequests;
+
+ private long terminationTimeout;
+
public AbstractClientBase(String version, SalesforceSession session,
- SalesforceHttpClient httpClient) throws SalesforceException {
+ SalesforceHttpClient httpClient) throws SalesforceException {
+ this(version, session, httpClient, DEFAULT_TERMINATION_TIMEOUT);
+ }
+
+ AbstractClientBase(String version, SalesforceSession session,
+ SalesforceHttpClient httpClient, int terminationTimeout) throws SalesforceException {
this.version = version;
this.session = session;
this.httpClient = httpClient;
+ this.terminationTimeout = terminationTimeout;
}
public void start() throws Exception {
@@ -83,10 +97,21 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
// also register this client as a session listener
session.addListener(this);
+
+ inflightRequests = new Phaser(1);
}
@Override
public void stop() throws Exception {
+ inflightRequests.arrive();
+ if (!inflightRequests.isTerminated()) {
+ try {
+ inflightRequests.awaitAdvanceInterruptibly(0, terminationTimeout, TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException ignored) {
+ // exception is ignored
+ }
+ }
+
// deregister listener
session.removeListener(this);
}
@@ -134,59 +159,64 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
buffers.clear();
}
+ inflightRequests.register();
// execute the request
request.send(new BufferingResponseListener(httpClient.getMaxContentLength()) {
@Override
public void onComplete(Result result) {
- Response response = result.getResponse();
- if (result.isFailed()) {
-
- // Failure!!!
- // including Salesforce errors reported as exception from SalesforceSecurityHandler
- Throwable failure = result.getFailure();
- if (failure instanceof SalesforceException) {
- callback.onResponse(null, (SalesforceException) failure);
+ try {
+ Response response = result.getResponse();
+ if (result.isFailed()) {
+
+ // Failure!!!
+ // including Salesforce errors reported as exception from SalesforceSecurityHandler
+ Throwable failure = result.getFailure();
+ if (failure instanceof SalesforceException) {
+ callback.onResponse(null, (SalesforceException) failure);
+ } else {
+ final String msg = String.format("Unexpected error {%s:%s} executing {%s:%s}",
+ response.getStatus(), response.getReason(), request.getMethod(), request.getURI());
+ callback.onResponse(null, new SalesforceException(msg, response.getStatus(), failure));
+ }
} else {
- final String msg = String.format("Unexpected error {%s:%s} executing {%s:%s}",
- response.getStatus(), response.getReason(), request.getMethod(), request.getURI());
- callback.onResponse(null, new SalesforceException(msg, response.getStatus(), failure));
- }
- } else {
- // HTTP error status
- final int status = response.getStatus();
- SalesforceHttpRequest request = (SalesforceHttpRequest) ((SalesforceHttpRequest) result.getRequest())
- .getConversation()
- .getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE);
+ // HTTP error status
+ final int status = response.getStatus();
+ SalesforceHttpRequest request = (SalesforceHttpRequest) ((SalesforceHttpRequest) result.getRequest())
+ .getConversation()
+ .getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE);
- if (status == HttpStatus.BAD_REQUEST_400 && request != null) {
- // parse login error
- ContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
- try {
+ if (status == HttpStatus.BAD_REQUEST_400 && request != null) {
+ // parse login error
+ ContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
+ try {
- session.parseLoginResponse(contentResponse, getContentAsString());
- final String msg = String.format("Unexpected Error {%s:%s} executing {%s:%s}",
- status, response.getReason(), request.getMethod(), request.getURI());
- callback.onResponse(null, new SalesforceException(msg, null));
+ session.parseLoginResponse(contentResponse, getContentAsString());
+ final String msg = String.format("Unexpected Error {%s:%s} executing {%s:%s}",
+ status, response.getReason(), request.getMethod(), request.getURI());
+ callback.onResponse(null, new SalesforceException(msg, null));
- } catch (SalesforceException e) {
+ } catch (SalesforceException e) {
- final String msg = String.format("Error {%s:%s} executing {%s:%s}",
- status, response.getReason(), request.getMethod(), request.getURI());
- callback.onResponse(null, new SalesforceException(msg, response.getStatus(), e));
+ final String msg = String.format("Error {%s:%s} executing {%s:%s}",
+ status, response.getReason(), request.getMethod(), request.getURI());
+ callback.onResponse(null, new SalesforceException(msg, response.getStatus(), e));
- }
- } else if (status < HttpStatus.OK_200 || status >= HttpStatus.MULTIPLE_CHOICES_300) {
- // Salesforce HTTP failure!
- final SalesforceException exception = createRestException(response, getContentAsInputStream());
+ }
+ } else if (status < HttpStatus.OK_200 || status >= HttpStatus.MULTIPLE_CHOICES_300) {
+ // Salesforce HTTP failure!
+ final SalesforceException exception = createRestException(response, getContentAsInputStream());
- // for APIs that return body on status 400, such as Composite API we need content as well
- callback.onResponse(getContentAsInputStream(), exception);
- } else {
+ // for APIs that return body on status 400, such as Composite API we need content as well
+ callback.onResponse(getContentAsInputStream(), exception);
+ } else {
- // Success!!!
- callback.onResponse(getContentAsInputStream(), null);
+ // Success!!!
+ callback.onResponse(getContentAsInputStream(), null);
+ }
}
+ } finally {
+ inflightRequests.arriveAndDeregister();
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3505e718/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
new file mode 100644
index 0000000..9ad3e8e
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import java.io.InputStream;
+
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.eclipse.jetty.client.HttpConversation;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Response.CompleteListener;
+import org.eclipse.jetty.client.api.Result;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AbstractClientBaseTest {
+ static class Client extends AbstractClientBase {
+ Client(final SalesforceSession session) throws SalesforceException {
+ super(null, session, mock(SalesforceHttpClient.class),
+ 1 /* 1 second termination timeout */);
+ }
+
+ @Override
+ protected SalesforceException createRestException(final Response response, final InputStream responseContent) {
+ return null;
+ }
+
+ @Override
+ protected void setAccessToken(final Request request) {
+ }
+
+ }
+
+ SalesforceSession session = mock(SalesforceSession.class);
+
+ // having client as a field also tests that the same client instance can be
+ // stopped and started again
+ final Client client;
+
+ public AbstractClientBaseTest() throws SalesforceException {
+ client = new Client(session);
+
+ when(session.getAccessToken()).thenReturn("token");
+ }
+
+ @Before
+ public void startClient() throws Exception {
+ client.start();
+ }
+
+ @Test
+ public void shouldNotHangIfRequestsHaveFinished() throws Exception {
+ final Request request = mock(Request.class);
+ final ArgumentCaptor<CompleteListener> listener = ArgumentCaptor.forClass(CompleteListener.class);
+
+ doNothing().when(request).send(listener.capture());
+
+ client.doHttpRequest(request, (response, exception) -> {
+ });
+
+ final Result result = mock(Result.class);
+ final Response response = mock(Response.class);
+ when(result.getResponse()).thenReturn(response);
+
+ final SalesforceHttpRequest salesforceRequest = mock(SalesforceHttpRequest.class);
+ when(result.getRequest()).thenReturn(salesforceRequest);
+
+ final HttpConversation conversation = mock(HttpConversation.class);
+ when(salesforceRequest.getConversation()).thenReturn(conversation);
+
+ when(conversation.getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE))
+ .thenReturn(salesforceRequest);
+
+ // completes the request
+ listener.getValue().onComplete(result);
+
+ final long stopStartTime = System.currentTimeMillis();
+ // should not wait
+ client.stop();
+
+ final long elapsed = System.currentTimeMillis() - stopStartTime;
+ assertTrue(elapsed < 10);
+ }
+
+ @Test
+ public void shouldTimeoutWhenRequestsAreStillOngoing() throws Exception {
+ client.doHttpRequest(mock(Request.class), (response, exception) -> {
+ });
+
+ // the request never completes
+
+ final long stopStartTime = System.currentTimeMillis();
+ // will wait for 1 second
+ client.stop();
+
+ final long elapsed = System.currentTimeMillis() - stopStartTime;
+ assertTrue(elapsed > 900 && elapsed < 1100);
+ }
+}