You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/07 05:51:15 UTC

[camel] branch camel-2.25.x updated: CAMEL-15233: Fix Replay Extension to correctly get replayId from message (#3975)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.25.x by this push:
     new e692131  CAMEL-15233: Fix Replay Extension to correctly get replayId from message (#3975)
e692131 is described below

commit e6921313a25084f23dd6562b1480f724b9cf981a
Author: edgarc-ciandt <ed...@ciandt.com>
AuthorDate: Tue Jul 7 02:51:01 2020 -0300

    CAMEL-15233: Fix Replay Extension to correctly get replayId from message (#3975)
---
 .../internal/client/DefaultBulkApiClient.java      |   5 +-
 ...tDReplayExtension.java => ReplayExtension.java} |  86 +++++++-------
 .../internal/streaming/SubscriptionHelper.java     |   2 +-
 .../internal/streaming/ReplayExtensionTest.java    | 127 +++++++++++++++++++++
 4 files changed, 170 insertions(+), 50 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
index bf14908..6a83d62 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java
@@ -34,6 +34,9 @@ import javax.xml.parsers.SAXParserFactory;
 import javax.xml.transform.Source;
 import javax.xml.transform.sax.SAXSource;
 
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
 import org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.api.dto.RestError;
@@ -53,8 +56,6 @@ import org.eclipse.jetty.client.util.InputStreamContentProvider;
 import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpMethod;
 import org.eclipse.jetty.util.StringUtil;
-import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
 
 public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiClient {
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
similarity index 55%
rename from components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
rename to components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
index f0156ef..96355e1 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
@@ -14,40 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-/**
- * Copyright (c) 2016, Salesforce Developers
+/*
+ * Copyright (c) 2016, salesforce.com, inc.
  * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- *    notice, this list of conditions and the following disclaimer in the
- *    documentation and/or other materials provided with the distribution.
- * 3. Neither the name of the copyright holder nor the names of its
- *    contributors may be used to endorse or promote products derived from
- *    this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- **/
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.TXT file in the repo root  or https://opensource.org/licenses/BSD-3-Clause
+ */
 package org.apache.camel.component.salesforce.internal.streaming;
 
+
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import org.cometd.bayeux.Channel;
 import org.cometd.bayeux.Message;
@@ -55,14 +36,16 @@ import org.cometd.bayeux.client.ClientSession;
 import org.cometd.bayeux.client.ClientSession.Extension.Adapter;
 
 /**
- * CometDReplayExtension, typical usages are the following:
- * {@code client.addExtension(new CometDReplayExtension<>(replayMap));}
+ * The Bayeux extension for replay
  *
- * @author yzhao
- * @since 198 (Winter '16)
+ * @author hal.hildebrand
+ * @since API v37.0
  */
-public class CometDReplayExtension extends Adapter {
+public class ReplayExtension extends Adapter {
     private static final String EXTENSION_NAME = "replay";
+    private static final String EVENT_KEY = "event";
+    private static final String REPLAY_ID_KEY = "replayId";
+
     private final ConcurrentMap<String, Long> dataMap = new ConcurrentHashMap<>();
     private final AtomicBoolean supported = new AtomicBoolean();
 
@@ -72,20 +55,11 @@ public class CometDReplayExtension extends Adapter {
 
     @Override
     public boolean rcv(ClientSession session, Message.Mutable message) {
-        final Object value = message.get(EXTENSION_NAME);
-
-        final Long replayId;
-        if (value instanceof Long) {
-            replayId = (Long)value;
-        } else if (value instanceof Number) {
-            replayId = ((Number)value).longValue();
-        } else {
-            replayId = null;
-        }
-
+        Long replayId = getReplayId(message);
         if (this.supported.get() && replayId != null) {
             try {
-                dataMap.put(message.getChannel(), replayId);
+                String channel = topicWithoutQueryString(message.getChannel());
+                dataMap.put(channel, replayId);
             } catch (ClassCastException e) {
                 return false;
             }
@@ -101,7 +75,6 @@ public class CometDReplayExtension extends Adapter {
             this.supported.set(ext != null && Boolean.TRUE.equals(ext.get(EXTENSION_NAME)));
             break;
         default:
-            break;
         }
         return true;
     }
@@ -118,8 +91,27 @@ public class CometDReplayExtension extends Adapter {
             }
             break;
         default:
-            break;
         }
         return true;
     }
-}
+
+    private static Long getReplayId(Message.Mutable message) {
+        Map<String, Object> data = message.getDataAsMap();
+        @SuppressWarnings("unchecked")
+        Optional<Long> optional = resolve(() -> (Long)((Map<String, Object>)data.get(EVENT_KEY)).get(REPLAY_ID_KEY));
+        return optional.orElse(null);
+    }
+
+    private static <T> Optional<T> resolve(Supplier<T> resolver) {
+        try {
+            T result = resolver.get();
+            return Optional.ofNullable(result);
+        } catch (NullPointerException e) {
+            return Optional.empty();
+        }
+    }
+
+    private static String topicWithoutQueryString(String fullTopic) {
+        return fullTopic.split("\\?")[0];
+    }
+}
\ No newline at end of file
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index c10aa92..ea1f6d0 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -58,7 +58,7 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
 
 public class SubscriptionHelper extends ServiceSupport {
 
-    static final CometDReplayExtension REPLAY_EXTENSION = new CometDReplayExtension();
+    static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
 
     private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
new file mode 100644
index 0000000..287234c
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.cometd.bayeux.Channel;
+import org.cometd.bayeux.Message;
+import org.cometd.common.HashMapMessage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ReplayExtensionTest {
+
+    static Message.Mutable createPushTopicMessage(boolean addReplayId) {
+        final Message.Mutable pushTopicMessage = new HashMapMessage();
+        pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+        final Map<String, Object> data = new HashMap<>();
+        pushTopicMessage.put("data", data);
+
+        final Map<String, Object> event = new HashMap<>();
+        data.put("event", event);
+
+        event.put("createdDate", "2016-09-16T19:45:27.454Z");
+        if (addReplayId) {
+            event.put("replayId", 1L);
+        }
+        event.put("type", "created");
+
+        final Map<String, Object> sobject = new HashMap<>();
+        data.put("sobject", sobject);
+
+        sobject.put("Phone", "(415) 555-1212");
+        sobject.put("Id", "001D000000KneakIAB");
+        sobject.put("Name", "Blackbeard");
+
+        pushTopicMessage.put("channel", "/topic/AccountUpdates");
+        return pushTopicMessage;
+    }
+
+    static Message.Mutable createHandshakeMessage(Boolean isReplaySupported) {
+        final Message.Mutable handshakeMessage = new HashMapMessage();
+        HashMap<String, Object> ext = new HashMap<>();
+        handshakeMessage.put("ext", ext);
+        handshakeMessage.put("channel", Channel.META_HANDSHAKE);
+        ext.put("replay", isReplaySupported);
+
+        return handshakeMessage;
+    }
+
+    @SuppressWarnings("unchecked")
+    static ConcurrentMap<String, Long> getDataMap(ReplayExtension replayExtension)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field field = ReplayExtension.class.getDeclaredField("dataMap");
+        field.setAccessible(true);
+
+        return (ConcurrentMap<String, Long>) field.get(replayExtension);
+    }
+
+    @Test
+    public void shouldKeepPreviousValueIfReplayIdNotInMessageWhenIsSupported()
+            throws NoSuchFieldException, IllegalAccessException {
+        final Message.Mutable pushTopicMessage = createPushTopicMessage(false);
+
+        final ReplayExtension replayExtension = new ReplayExtension();
+        replayExtension.rcvMeta(null, createHandshakeMessage(true));
+
+        replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L);
+
+        replayExtension.rcv(null, pushTopicMessage);
+
+        ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+        assertEquals(Long.valueOf(123L), dataMap.get("/topic/AccountUpdates"));
+    }
+
+    @Test
+    public void shouldUpdateReplayIdFromMessageWhenIsSupported() throws NoSuchFieldException, IllegalAccessException {
+        final Message.Mutable pushTopicMessage = createPushTopicMessage(true);
+
+        final ReplayExtension replayExtension = new ReplayExtension();
+        replayExtension.rcvMeta(null, createHandshakeMessage(true));
+
+        replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L);
+
+        replayExtension.rcv(null, pushTopicMessage);
+
+        ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+        assertEquals(Long.valueOf(1L), dataMap.get("/topic/AccountUpdates"));
+
+    }
+
+    @Test
+    public void shouldNotUpdateReplayIdFromMessageWhenIsNotSupported()
+            throws NoSuchFieldException, IllegalAccessException {
+        final Message.Mutable pushTopicMessage = createPushTopicMessage(true);
+
+        final ReplayExtension replayExtension = new ReplayExtension();
+        replayExtension.rcvMeta(null, createHandshakeMessage(false));
+
+        replayExtension.rcv(null, pushTopicMessage);
+
+        ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+        assertEquals(0, dataMap.size());
+    }
+}