You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2017/04/28 15:56:14 UTC

camel git commit: CAMEL-11212 Don't allow Salesforce HTTP client ...

Repository: camel
Updated Branches:
  refs/heads/master 626ef3203 -> 3505e718d


CAMEL-11212 Don't allow Salesforce HTTP client ...

...to stop with outstanding requests

This introduces a `Phaser` to await for any in-flight requests to
complete before the client is allowed to stop.

This should not block as long as the
`BufferingResponseListener::onComplete` is invoked at the end of every
request. And that is guaranteed by the `CompleteListener::onComplete`
contract.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3505e718
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3505e718
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3505e718

Branch: refs/heads/master
Commit: 3505e718db48cc0d8be5b47f6c4030c958d93a5f
Parents: 626ef32
Author: Zoran Regvart <zr...@apache.org>
Authored: Thu Apr 27 14:40:18 2017 +0200
Committer: Zoran Regvart <zr...@apache.org>
Committed: Fri Apr 28 17:55:54 2017 +0200

----------------------------------------------------------------------
 .../internal/client/AbstractClientBase.java     | 110 +++++++++++------
 .../internal/client/AbstractClientBaseTest.java | 121 +++++++++++++++++++
 2 files changed, 191 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3505e718/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
index af96dcd..26a86d9 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBase.java
@@ -21,7 +21,9 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
@@ -55,6 +57,8 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
     protected static final String APPLICATION_JSON_UTF8 = "application/json;charset=utf-8";
     protected static final String APPLICATION_XML_UTF8 = "application/xml;charset=utf-8";
 
+    private static final int DEFAULT_TERMINATION_TIMEOUT = 10;
+
     protected final Logger log = LoggerFactory.getLogger(getClass());
 
     protected final SalesforceHttpClient httpClient;
@@ -64,12 +68,22 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
     protected String accessToken;
     protected String instanceUrl;
 
+    private Phaser inflightRequests;
+
+    private long terminationTimeout;
+
     public AbstractClientBase(String version, SalesforceSession session,
-                              SalesforceHttpClient httpClient) throws SalesforceException {
+        SalesforceHttpClient httpClient) throws SalesforceException {
+        this(version, session, httpClient, DEFAULT_TERMINATION_TIMEOUT);
+    }
+
+    AbstractClientBase(String version, SalesforceSession session,
+                              SalesforceHttpClient httpClient, int terminationTimeout) throws SalesforceException {
 
         this.version = version;
         this.session = session;
         this.httpClient = httpClient;
+        this.terminationTimeout = terminationTimeout;
     }
 
     public void start() throws Exception {
@@ -83,10 +97,21 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
 
         // also register this client as a session listener
         session.addListener(this);
+
+        inflightRequests = new Phaser(1);
     }
 
     @Override
     public void stop() throws Exception {
+        inflightRequests.arrive();
+        if (!inflightRequests.isTerminated()) {
+            try {
+                inflightRequests.awaitAdvanceInterruptibly(0, terminationTimeout, TimeUnit.SECONDS);
+            } catch (InterruptedException | TimeoutException ignored) {
+                // exception is ignored
+            }
+        }
+
         // deregister listener
         session.removeListener(this);
     }
@@ -134,59 +159,64 @@ public abstract class AbstractClientBase implements SalesforceSession.Salesforce
             buffers.clear();
         }
 
+        inflightRequests.register();
         // execute the request
         request.send(new BufferingResponseListener(httpClient.getMaxContentLength()) {
             @Override
             public void onComplete(Result result) {
-                Response response = result.getResponse();
-                if (result.isFailed()) {
-
-                    // Failure!!!
-                    // including Salesforce errors reported as exception from SalesforceSecurityHandler
-                    Throwable failure = result.getFailure();
-                    if (failure instanceof SalesforceException) {
-                        callback.onResponse(null, (SalesforceException) failure);
+                try {
+                    Response response = result.getResponse();
+                    if (result.isFailed()) {
+
+                        // Failure!!!
+                        // including Salesforce errors reported as exception from SalesforceSecurityHandler
+                        Throwable failure = result.getFailure();
+                        if (failure instanceof SalesforceException) {
+                            callback.onResponse(null, (SalesforceException) failure);
+                        } else {
+                            final String msg = String.format("Unexpected error {%s:%s} executing {%s:%s}",
+                                response.getStatus(), response.getReason(), request.getMethod(), request.getURI());
+                            callback.onResponse(null, new SalesforceException(msg, response.getStatus(), failure));
+                        }
                     } else {
-                        final String msg = String.format("Unexpected error {%s:%s} executing {%s:%s}",
-                            response.getStatus(), response.getReason(), request.getMethod(), request.getURI());
-                        callback.onResponse(null, new SalesforceException(msg, response.getStatus(), failure));
-                    }
-                } else {
 
-                    // HTTP error status
-                    final int status = response.getStatus();
-                    SalesforceHttpRequest request = (SalesforceHttpRequest) ((SalesforceHttpRequest) result.getRequest())
-                        .getConversation()
-                        .getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE);
+                        // HTTP error status
+                        final int status = response.getStatus();
+                        SalesforceHttpRequest request = (SalesforceHttpRequest) ((SalesforceHttpRequest) result.getRequest())
+                            .getConversation()
+                            .getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE);
 
-                    if (status == HttpStatus.BAD_REQUEST_400 && request != null) {
-                        // parse login error
-                        ContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
-                        try {
+                        if (status == HttpStatus.BAD_REQUEST_400 && request != null) {
+                            // parse login error
+                            ContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
+                            try {
 
-                            session.parseLoginResponse(contentResponse, getContentAsString());
-                            final String msg = String.format("Unexpected Error {%s:%s} executing {%s:%s}",
-                                status, response.getReason(), request.getMethod(), request.getURI());
-                            callback.onResponse(null, new SalesforceException(msg, null));
+                                session.parseLoginResponse(contentResponse, getContentAsString());
+                                final String msg = String.format("Unexpected Error {%s:%s} executing {%s:%s}",
+                                    status, response.getReason(), request.getMethod(), request.getURI());
+                                callback.onResponse(null, new SalesforceException(msg, null));
 
-                        } catch (SalesforceException e) {
+                            } catch (SalesforceException e) {
 
-                            final String msg = String.format("Error {%s:%s} executing {%s:%s}",
-                                status, response.getReason(), request.getMethod(), request.getURI());
-                            callback.onResponse(null, new SalesforceException(msg, response.getStatus(), e));
+                                final String msg = String.format("Error {%s:%s} executing {%s:%s}",
+                                    status, response.getReason(), request.getMethod(), request.getURI());
+                                callback.onResponse(null, new SalesforceException(msg, response.getStatus(), e));
 
-                        }
-                    } else if (status < HttpStatus.OK_200 || status >= HttpStatus.MULTIPLE_CHOICES_300) {
-                        // Salesforce HTTP failure!
-                        final SalesforceException exception = createRestException(response, getContentAsInputStream());
+                            }
+                        } else if (status < HttpStatus.OK_200 || status >= HttpStatus.MULTIPLE_CHOICES_300) {
+                            // Salesforce HTTP failure!
+                            final SalesforceException exception = createRestException(response, getContentAsInputStream());
 
-                        // for APIs that return body on status 400, such as Composite API we need content as well
-                        callback.onResponse(getContentAsInputStream(), exception);
-                    } else {
+                            // for APIs that return body on status 400, such as Composite API we need content as well
+                            callback.onResponse(getContentAsInputStream(), exception);
+                        } else {
 
-                        // Success!!!
-                        callback.onResponse(getContentAsInputStream(), null);
+                            // Success!!!
+                            callback.onResponse(getContentAsInputStream(), null);
+                        }
                     }
+                } finally {
+                    inflightRequests.arriveAndDeregister();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3505e718/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
new file mode 100644
index 0000000..9ad3e8e
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/client/AbstractClientBaseTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.client;
+
+import java.io.InputStream;
+
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.api.SalesforceException;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.eclipse.jetty.client.HttpConversation;
+import org.eclipse.jetty.client.api.Request;
+import org.eclipse.jetty.client.api.Response;
+import org.eclipse.jetty.client.api.Response.CompleteListener;
+import org.eclipse.jetty.client.api.Result;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AbstractClientBaseTest {
+    static class Client extends AbstractClientBase {
+        Client(final SalesforceSession session) throws SalesforceException {
+            super(null, session, mock(SalesforceHttpClient.class),
+                1 /* 1 second termination timeout */);
+        }
+
+        @Override
+        protected SalesforceException createRestException(final Response response, final InputStream responseContent) {
+            return null;
+        }
+
+        @Override
+        protected void setAccessToken(final Request request) {
+        }
+
+    }
+
+    SalesforceSession session = mock(SalesforceSession.class);
+
+    // having client as a field also tests that the same client instance can be
+    // stopped and started again
+    final Client client;
+
+    public AbstractClientBaseTest() throws SalesforceException {
+        client = new Client(session);
+
+        when(session.getAccessToken()).thenReturn("token");
+    }
+
+    @Before
+    public void startClient() throws Exception {
+        client.start();
+    }
+
+    @Test
+    public void shouldNotHangIfRequestsHaveFinished() throws Exception {
+        final Request request = mock(Request.class);
+        final ArgumentCaptor<CompleteListener> listener = ArgumentCaptor.forClass(CompleteListener.class);
+
+        doNothing().when(request).send(listener.capture());
+
+        client.doHttpRequest(request, (response, exception) -> {
+        });
+
+        final Result result = mock(Result.class);
+        final Response response = mock(Response.class);
+        when(result.getResponse()).thenReturn(response);
+
+        final SalesforceHttpRequest salesforceRequest = mock(SalesforceHttpRequest.class);
+        when(result.getRequest()).thenReturn(salesforceRequest);
+
+        final HttpConversation conversation = mock(HttpConversation.class);
+        when(salesforceRequest.getConversation()).thenReturn(conversation);
+
+        when(conversation.getAttribute(SalesforceSecurityHandler.AUTHENTICATION_REQUEST_ATTRIBUTE))
+            .thenReturn(salesforceRequest);
+
+        // completes the request
+        listener.getValue().onComplete(result);
+
+        final long stopStartTime = System.currentTimeMillis();
+        // should not wait
+        client.stop();
+
+        final long elapsed = System.currentTimeMillis() - stopStartTime;
+        assertTrue(elapsed < 10);
+    }
+
+    @Test
+    public void shouldTimeoutWhenRequestsAreStillOngoing() throws Exception {
+        client.doHttpRequest(mock(Request.class), (response, exception) -> {
+        });
+
+        // the request never completes
+
+        final long stopStartTime = System.currentTimeMillis();
+        // will wait for 1 second
+        client.stop();
+
+        final long elapsed = System.currentTimeMillis() - stopStartTime;
+        assertTrue(elapsed > 900 && elapsed < 1100);
+    }
+}