You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/03 08:55:06 UTC

[camel] branch master updated: CAMEL-15233: Fixes Replay Extension to properly save replay id value (#3969)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fc71ae  CAMEL-15233: Fixes Replay Extension to properly save replay id value (#3969)
0fc71ae is described below

commit 0fc71ae176cb0878bc60e89b5e32ec363ab2344b
Author: edgarc-ciandt <ed...@ciandt.com>
AuthorDate: Fri Jul 3 05:54:55 2020 -0300

    CAMEL-15233: Fixes Replay Extension to properly save replay id value (#3969)
    
    * CAMEL-15233: Fix Replay Extension to correctly get replayId from message
    
    * CAMEL-15233: Fix check style issues
    
    * CAMEL-15233: Remove ReflectionUtil reference from tests
    
    * CAMEL-15233: Remove Commented out code
    
    * CAMEL-15233: Move method within class
    
    * CAMEL-15233: Fix CheckStyle issues
---
 ...tDReplayExtension.java => ReplayExtension.java} |  86 +++++++-------
 .../internal/streaming/SubscriptionHelper.java     |   2 +-
 .../internal/streaming/ReplayExtensionTest.java    | 127 +++++++++++++++++++++
 3 files changed, 167 insertions(+), 48 deletions(-)

diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
similarity index 56%
rename from components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
rename to components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
index ec9a523..dc5ff20 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
@@ -14,40 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-/**
- * Copyright (c) 2016, Salesforce Developers
+/*
+ * Copyright (c) 2016, salesforce.com, inc.
  * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- *    notice, this list of conditions and the following disclaimer in the
- *    documentation and/or other materials provided with the distribution.
- * 3. Neither the name of the copyright holder nor the names of its
- *    contributors may be used to endorse or promote products derived from
- *    this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- **/
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.TXT file in the repo root  or https://opensource.org/licenses/BSD-3-Clause
+ */
 package org.apache.camel.component.salesforce.internal.streaming;
 
+
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import org.cometd.bayeux.Channel;
 import org.cometd.bayeux.Message;
@@ -55,14 +36,16 @@ import org.cometd.bayeux.client.ClientSession;
 import org.cometd.bayeux.client.ClientSession.Extension.Adapter;
 
 /**
- * CometDReplayExtension, typical usages are the following:
- * {@code client.addExtension(new CometDReplayExtension<>(replayMap));}
+ * The Bayeux extension for replay
  *
- * @author yzhao
- * @since 198 (Winter '16)
+ * @author hal.hildebrand
+ * @since API v37.0
  */
-public class CometDReplayExtension extends Adapter {
+public class ReplayExtension extends Adapter {
     private static final String EXTENSION_NAME = "replay";
+    private static final String EVENT_KEY = "event";
+    private static final String REPLAY_ID_KEY = "replayId";
+
     private final ConcurrentMap<String, Long> dataMap = new ConcurrentHashMap<>();
     private final AtomicBoolean supported = new AtomicBoolean();
 
@@ -72,20 +55,11 @@ public class CometDReplayExtension extends Adapter {
 
     @Override
     public boolean rcv(ClientSession session, Message.Mutable message) {
-        final Object value = message.get(EXTENSION_NAME);
-
-        final Long replayId;
-        if (value instanceof Long) {
-            replayId = (Long)value;
-        } else if (value instanceof Number) {
-            replayId = ((Number)value).longValue();
-        } else {
-            replayId = null;
-        }
-
+        Long replayId = getReplayId(message);
         if (this.supported.get() && replayId != null) {
             try {
-                dataMap.put(message.getChannel(), replayId);
+                String channel = topicWithoutQueryString(message.getChannel());
+                dataMap.put(channel, replayId);
             } catch (ClassCastException e) {
                 return false;
             }
@@ -101,7 +75,6 @@ public class CometDReplayExtension extends Adapter {
                 this.supported.set(ext != null && Boolean.TRUE.equals(ext.get(EXTENSION_NAME)));
                 break;
             default:
-                break;
         }
         return true;
     }
@@ -118,8 +91,27 @@ public class CometDReplayExtension extends Adapter {
                 }
                 break;
             default:
-                break;
         }
         return true;
     }
-}
+
+    private static Long getReplayId(Message.Mutable message) {
+        Map<String, Object> data = message.getDataAsMap();
+        @SuppressWarnings("unchecked")
+        Optional<Long> optional = resolve(() -> (Long)((Map<String, Object>)data.get(EVENT_KEY)).get(REPLAY_ID_KEY));
+        return optional.orElse(null);
+    }
+
+    private static <T> Optional<T> resolve(Supplier<T> resolver) {
+        try {
+            T result = resolver.get();
+            return Optional.ofNullable(result);
+        } catch (NullPointerException e) {
+            return Optional.empty();
+        }
+    }
+
+    private static String topicWithoutQueryString(String fullTopic) {
+        return fullTopic.split("\\?")[0];
+    }
+}
\ No newline at end of file
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 711c1dd..faf530f 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -57,7 +57,7 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
 
 public class SubscriptionHelper extends ServiceSupport {
 
-    static final CometDReplayExtension REPLAY_EXTENSION = new CometDReplayExtension();
+    static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
 
     private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
 
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
new file mode 100644
index 0000000..6243d85
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.cometd.bayeux.Channel;
+import org.cometd.bayeux.Message;
+import org.cometd.common.HashMapMessage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ReplayExtensionTest {
+
+    static Message.Mutable createPushTopicMessage(boolean addReplayId) {
+        final Message.Mutable pushTopicMessage = new HashMapMessage();
+        pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+        final Map<String, Object> data = new HashMap<>();
+        pushTopicMessage.put("data", data);
+
+        final Map<String, Object> event = new HashMap<>();
+        data.put("event", event);
+
+        event.put("createdDate", "2016-09-16T19:45:27.454Z");
+        if (addReplayId) {
+            event.put("replayId", 1L);
+        }
+        event.put("type", "created");
+
+        final Map<String, Object> sobject = new HashMap<>();
+        data.put("sobject", sobject);
+
+        sobject.put("Phone", "(415) 555-1212");
+        sobject.put("Id", "001D000000KneakIAB");
+        sobject.put("Name", "Blackbeard");
+
+        pushTopicMessage.put("channel", "/topic/AccountUpdates");
+        return pushTopicMessage;
+    }
+
+    static Message.Mutable createHandshakeMessage(Boolean isReplaySupported) {
+        final Message.Mutable handshakeMessage = new HashMapMessage();
+        HashMap<String, Object> ext = new HashMap<>();
+        handshakeMessage.put("ext", ext);
+        handshakeMessage.put("channel", Channel.META_HANDSHAKE);
+        ext.put("replay", isReplaySupported);
+
+        return handshakeMessage;
+    }
+
+    @SuppressWarnings("unchecked")
+    static ConcurrentMap<String, Long> getDataMap(ReplayExtension replayExtension)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field field = ReplayExtension.class.getDeclaredField("dataMap");
+        field.setAccessible(true);
+
+        return (ConcurrentMap<String, Long>) field.get(replayExtension);
+    }
+
+    @Test
+    public void shouldKeepPreviousValueIfReplayIdNotInMessageWhenIsSupported()
+            throws NoSuchFieldException, IllegalAccessException {
+        final Message.Mutable pushTopicMessage = createPushTopicMessage(false);
+
+        final ReplayExtension replayExtension = new ReplayExtension();
+        replayExtension.rcvMeta(null, createHandshakeMessage(true));
+
+        replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L);
+
+        replayExtension.rcv(null, pushTopicMessage);
+
+        ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+        assertEquals(Long.valueOf(123L), dataMap.get("/topic/AccountUpdates"));
+    }
+
+    @Test
+    public void shouldUpdateReplayIdFromMessageWhenIsSupported() throws NoSuchFieldException, IllegalAccessException {
+        final Message.Mutable pushTopicMessage = createPushTopicMessage(true);
+
+        final ReplayExtension replayExtension = new ReplayExtension();
+        replayExtension.rcvMeta(null, createHandshakeMessage(true));
+
+        replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L);
+
+        replayExtension.rcv(null, pushTopicMessage);
+
+        ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+        assertEquals(Long.valueOf(1L), dataMap.get("/topic/AccountUpdates"));
+
+    }
+
+    @Test
+    public void shouldNotUpdateReplayIdFromMessageWhenIsNotSupported()
+            throws NoSuchFieldException, IllegalAccessException {
+        final Message.Mutable pushTopicMessage = createPushTopicMessage(true);
+
+        final ReplayExtension replayExtension = new ReplayExtension();
+        replayExtension.rcvMeta(null, createHandshakeMessage(false));
+
+        replayExtension.rcv(null, pushTopicMessage);
+
+        ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+        assertEquals(0, dataMap.size());
+    }
+}