You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by zr...@apache.org on 2017/02/17 14:27:09 UTC

camel git commit: CAMEL-10848 Salesforce: configure initialReplay...

Repository: camel
Updated Branches:
  refs/heads/master 1d288e080 -> e682e7ee0


CAMEL-10848 Salesforce: configure initialReplay...

...IdMap requires keys to be prefixed with "/topic"

This commit changes the way replayIds are looked up in
initialReplayIdMap while maintaining backward compatibility.

The way you needed to configure the initialReplayIdMap previously
exposed the internals of how CometD subscriptions operate, you had to
start with `/topic/` and then add your topic name, whilist when
configuring the endpoint you would configure just the topic name.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e682e7ee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e682e7ee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e682e7ee

Branch: refs/heads/master
Commit: e682e7ee0d3a0e0487777c80257cf421adcb182d
Parents: 1d288e0
Author: Zoran Regvart <zr...@apache.org>
Authored: Fri Feb 17 15:24:39 2017 +0100
Committer: Zoran Regvart <zr...@apache.org>
Committed: Fri Feb 17 15:24:39 2017 +0100

----------------------------------------------------------------------
 .../internal/streaming/SubscriptionHelper.java  |  18 +--
 .../streaming/SubscriptionHelperTest.java       | 114 +++++++++++++++++++
 2 files changed, 124 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e682e7ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 411caa1..a467fdb 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -65,9 +65,10 @@ public class SubscriptionHelper extends ServiceSupport {
     private static final String EXCEPTION_FIELD = "exception";
     private static final int DISCONNECT_INTERVAL = 5000;
 
+    final BayeuxClient client;
+
     private final SalesforceComponent component;
     private final SalesforceSession session;
-    private final BayeuxClient client;
     private final long timeout = 60 * 1000L;
 
     private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
@@ -93,7 +94,7 @@ public class SubscriptionHelper extends ServiceSupport {
         this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>();
 
         // create CometD client
-        this.client = createClient(topicName);
+        this.client = createClient(component, topicName);
 
         restartBackoff = new AtomicLong(0);
         backoffIncrement = component.getConfig().getBackoffIncrement();
@@ -312,17 +313,18 @@ public class SubscriptionHelper extends ServiceSupport {
 
         boolean disconnected = client.disconnect(timeout);
         if (!disconnected) {
-            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(), timeout);
+            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(component), timeout);
         }
     }
 
-    private BayeuxClient createClient(String topicName) throws Exception {
+    static BayeuxClient createClient(final SalesforceComponent component, final String topicName) throws Exception {
         // use default Jetty client from SalesforceComponent, its shared by all consumers
         final SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
 
         Map<String, Object> options = new HashMap<String, Object>();
         options.put(ClientTransport.MAX_NETWORK_DELAY_OPTION, httpClient.getTimeout());
 
+        final SalesforceSession session = component.getSession();
         // check login access token
         if (session.getAccessToken() == null) {
             // lazy login here!
@@ -340,12 +342,12 @@ public class SubscriptionHelper extends ServiceSupport {
             }
         };
 
-        BayeuxClient client = new BayeuxClient(getEndpointUrl(), transport);
+        BayeuxClient client = new BayeuxClient(getEndpointUrl(component), transport);
         Integer replayId = null;
         String channelName = getChannelName(topicName);
         Map<String, Integer> replayIdMap = component.getConfig().getInitialReplayIdMap();
         if (replayIdMap != null) {
-            replayId = replayIdMap.get(channelName);
+            replayId = replayIdMap.getOrDefault(topicName, replayIdMap.get(channelName));
         }
         if (replayId == null) {
             replayId = component.getConfig().getDefaultReplayId();
@@ -419,7 +421,7 @@ public class SubscriptionHelper extends ServiceSupport {
         clientChannel.subscribe(listener);
     }
 
-    private String getChannelName(String topicName) {
+    static String getChannelName(String topicName) {
         return "/topic/" + topicName;
     }
 
@@ -491,7 +493,7 @@ public class SubscriptionHelper extends ServiceSupport {
         }
     }
 
-    public String getEndpointUrl() {
+    static String getEndpointUrl(final SalesforceComponent component) {
         // In version 36.0 replay is only enabled on a separate endpoint
         if (Double.valueOf(component.getConfig().getApiVersion()) == 36.0) {
             boolean replayOptionsPresent = component.getConfig().getDefaultReplayId() != null

http://git-wip-us.apache.org/repos/asf/camel/blob/e682e7ee/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
----------------------------------------------------------------------
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
new file mode 100644
index 0000000..b5de54a
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
+import org.apache.camel.component.salesforce.SalesforceHttpClient;
+import org.apache.camel.component.salesforce.internal.SalesforceSession;
+import org.cometd.bayeux.Channel;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSession;
+import org.cometd.bayeux.client.ClientSession.Extension;
+import org.cometd.client.BayeuxClient;
+import org.cometd.common.HashMapMessage;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SubscriptionHelperTest {
+
+    static final ClientSession NOT_USED = null;
+
+    final SalesforceComponent component = mock(SalesforceComponent.class);
+
+    final SalesforceSession session = mock(SalesforceSession.class);
+
+    final SalesforceEndpointConfig config = mock(SalesforceEndpointConfig.class);
+
+    @Before
+    public void setupMocks() {
+        when(component.getSession()).thenReturn(session);
+
+        when(session.getInstanceUrl()).thenReturn("https://some.url");
+
+        when(component.getConfig()).thenReturn(config);
+        when(component.getConfig().getApiVersion()).thenReturn(SalesforceEndpointConfig.DEFAULT_VERSION);
+
+        when(config.getHttpClient()).thenReturn(mock(SalesforceHttpClient.class));
+
+    }
+
+    @Test
+    public void shouldSupportInitialConfigMapWithTwoKeySyntaxes() throws Exception {
+        final Map<String, Integer> initialReplayIdMap = new HashMap<>();
+        initialReplayIdMap.put("my-topic-1", 10);
+        initialReplayIdMap.put("/topic/my-topic-1", 20);
+        initialReplayIdMap.put("/topic/my-topic-2", 30);
+
+        when(config.getDefaultReplayId()).thenReturn(14);
+
+        when(config.getInitialReplayIdMap()).thenReturn(initialReplayIdMap);
+
+        assertEquals("Expecting replayId for `my-topic-1` to be 10, as short topic names have priority", (Object) 10,
+                fetchReplayExtensionValue("my-topic-1").get("/topic/my-topic-1"));
+
+        assertEquals("Expecting replayId for `my-topic-2` to be 30, the only one given", (Object) 30,
+                fetchReplayExtensionValue("my-topic-2").get("/topic/my-topic-2"));
+
+        assertEquals("Expecting replayId for `my-topic-3` to be 14, the default", (Object) 14,
+                fetchReplayExtensionValue("my-topic-3").get("/topic/my-topic-3"));
+    }
+
+    Map<String, Integer> fetchReplayExtensionValue(final String topicName) throws Exception {
+        final BayeuxClient client = SubscriptionHelper.createClient(component, topicName);
+
+        final List<Extension> extensions = client.getExtensions();
+
+        final Optional<Extension> extension = extensions.stream().filter(e -> e instanceof CometDReplayExtension)
+                .findFirst();
+
+        assertTrue("Client should be configured with CometDReplayExtension extension", extension.isPresent());
+
+        final CometDReplayExtension cometDReplayExtension = (CometDReplayExtension) extension.get();
+
+        final Message.Mutable handshake = new HashMapMessage();
+        handshake.setChannel(Channel.META_HANDSHAKE);
+        handshake.put(Message.EXT_FIELD, Collections.singletonMap("replay", true));
+
+        cometDReplayExtension.rcvMeta(NOT_USED, handshake);
+
+        final Message.Mutable subscription = new HashMapMessage();
+        subscription.setChannel(Channel.META_SUBSCRIBE);
+        cometDReplayExtension.sendMeta(NOT_USED, subscription);
+
+        @SuppressWarnings("unchecked")
+        final Map<String, Integer> replays = (Map<String, Integer>) subscription.getExt().get("replay");
+
+        return replays;
+    }
+}