You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/07/03 08:55:06 UTC
[camel] branch master updated: CAMEL-15233: Fixes Replay Extension
to properly save replay id value (#3969)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 0fc71ae CAMEL-15233: Fixes Replay Extension to properly save replay id value (#3969)
0fc71ae is described below
commit 0fc71ae176cb0878bc60e89b5e32ec363ab2344b
Author: edgarc-ciandt <ed...@ciandt.com>
AuthorDate: Fri Jul 3 05:54:55 2020 -0300
CAMEL-15233: Fixes Replay Extension to properly save replay id value (#3969)
* CAMEL-15233: Fix Replay Extension to correctly get replayId from message
* CAMEL-15233: Fix check style issues
* CAMEL-15233: Remove ReflectionUtil reference from tests
* CAMEL-15233: Remove Commented out code
* CAMEL-15233: Move method within class
* CAMEL-15233: Fix CheckStyle issues
---
...tDReplayExtension.java => ReplayExtension.java} | 86 +++++++-------
.../internal/streaming/SubscriptionHelper.java | 2 +-
.../internal/streaming/ReplayExtensionTest.java | 127 +++++++++++++++++++++
3 files changed, 167 insertions(+), 48 deletions(-)
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
similarity index 56%
rename from components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
rename to components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
index ec9a523..dc5ff20 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java
@@ -14,40 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- * Copyright (c) 2016, Salesforce Developers
+/*
+ * Copyright (c) 2016, salesforce.com, inc.
* All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * 3. Neither the name of the copyright holder nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- **/
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.TXT file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
package org.apache.camel.component.salesforce.internal.streaming;
+
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
@@ -55,14 +36,16 @@ import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSession.Extension.Adapter;
/**
- * CometDReplayExtension, typical usages are the following:
- * {@code client.addExtension(new CometDReplayExtension<>(replayMap));}
+ * The Bayeux extension for replay
*
- * @author yzhao
- * @since 198 (Winter '16)
+ * @author hal.hildebrand
+ * @since API v37.0
*/
-public class CometDReplayExtension extends Adapter {
+public class ReplayExtension extends Adapter {
private static final String EXTENSION_NAME = "replay";
+ private static final String EVENT_KEY = "event";
+ private static final String REPLAY_ID_KEY = "replayId";
+
private final ConcurrentMap<String, Long> dataMap = new ConcurrentHashMap<>();
private final AtomicBoolean supported = new AtomicBoolean();
@@ -72,20 +55,11 @@ public class CometDReplayExtension extends Adapter {
@Override
public boolean rcv(ClientSession session, Message.Mutable message) {
- final Object value = message.get(EXTENSION_NAME);
-
- final Long replayId;
- if (value instanceof Long) {
- replayId = (Long)value;
- } else if (value instanceof Number) {
- replayId = ((Number)value).longValue();
- } else {
- replayId = null;
- }
-
+ Long replayId = getReplayId(message);
if (this.supported.get() && replayId != null) {
try {
- dataMap.put(message.getChannel(), replayId);
+ String channel = topicWithoutQueryString(message.getChannel());
+ dataMap.put(channel, replayId);
} catch (ClassCastException e) {
return false;
}
@@ -101,7 +75,6 @@ public class CometDReplayExtension extends Adapter {
this.supported.set(ext != null && Boolean.TRUE.equals(ext.get(EXTENSION_NAME)));
break;
default:
- break;
}
return true;
}
@@ -118,8 +91,27 @@ public class CometDReplayExtension extends Adapter {
}
break;
default:
- break;
}
return true;
}
-}
+
+ private static Long getReplayId(Message.Mutable message) {
+ Map<String, Object> data = message.getDataAsMap();
+ @SuppressWarnings("unchecked")
+ Optional<Long> optional = resolve(() -> (Long)((Map<String, Object>)data.get(EVENT_KEY)).get(REPLAY_ID_KEY));
+ return optional.orElse(null);
+ }
+
+ private static <T> Optional<T> resolve(Supplier<T> resolver) {
+ try {
+ T result = resolver.get();
+ return Optional.ofNullable(result);
+ } catch (NullPointerException e) {
+ return Optional.empty();
+ }
+ }
+
+ private static String topicWithoutQueryString(String fullTopic) {
+ return fullTopic.split("\\?")[0];
+ }
+}
\ No newline at end of file
diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index 711c1dd..faf530f 100644
--- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -57,7 +57,7 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
public class SubscriptionHelper extends ServiceSupport {
- static final CometDReplayExtension REPLAY_EXTENSION = new CometDReplayExtension();
+ static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
new file mode 100644
index 0000000..6243d85
--- /dev/null
+++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.salesforce.internal.streaming;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import org.cometd.bayeux.Channel;
+import org.cometd.bayeux.Message;
+import org.cometd.common.HashMapMessage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ReplayExtensionTest {
+
+ static Message.Mutable createPushTopicMessage(boolean addReplayId) {
+ final Message.Mutable pushTopicMessage = new HashMapMessage();
+ pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k");
+
+ final Map<String, Object> data = new HashMap<>();
+ pushTopicMessage.put("data", data);
+
+ final Map<String, Object> event = new HashMap<>();
+ data.put("event", event);
+
+ event.put("createdDate", "2016-09-16T19:45:27.454Z");
+ if (addReplayId) {
+ event.put("replayId", 1L);
+ }
+ event.put("type", "created");
+
+ final Map<String, Object> sobject = new HashMap<>();
+ data.put("sobject", sobject);
+
+ sobject.put("Phone", "(415) 555-1212");
+ sobject.put("Id", "001D000000KneakIAB");
+ sobject.put("Name", "Blackbeard");
+
+ pushTopicMessage.put("channel", "/topic/AccountUpdates");
+ return pushTopicMessage;
+ }
+
+ static Message.Mutable createHandshakeMessage(Boolean isReplaySupported) {
+ final Message.Mutable handshakeMessage = new HashMapMessage();
+ HashMap<String, Object> ext = new HashMap<>();
+ handshakeMessage.put("ext", ext);
+ handshakeMessage.put("channel", Channel.META_HANDSHAKE);
+ ext.put("replay", isReplaySupported);
+
+ return handshakeMessage;
+ }
+
+ @SuppressWarnings("unchecked")
+ static ConcurrentMap<String, Long> getDataMap(ReplayExtension replayExtension)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = ReplayExtension.class.getDeclaredField("dataMap");
+ field.setAccessible(true);
+
+ return (ConcurrentMap<String, Long>) field.get(replayExtension);
+ }
+
+ @Test
+ public void shouldKeepPreviousValueIfReplayIdNotInMessageWhenIsSupported()
+ throws NoSuchFieldException, IllegalAccessException {
+ final Message.Mutable pushTopicMessage = createPushTopicMessage(false);
+
+ final ReplayExtension replayExtension = new ReplayExtension();
+ replayExtension.rcvMeta(null, createHandshakeMessage(true));
+
+ replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L);
+
+ replayExtension.rcv(null, pushTopicMessage);
+
+ ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+ assertEquals(Long.valueOf(123L), dataMap.get("/topic/AccountUpdates"));
+ }
+
+ @Test
+ public void shouldUpdateReplayIdFromMessageWhenIsSupported() throws NoSuchFieldException, IllegalAccessException {
+ final Message.Mutable pushTopicMessage = createPushTopicMessage(true);
+
+ final ReplayExtension replayExtension = new ReplayExtension();
+ replayExtension.rcvMeta(null, createHandshakeMessage(true));
+
+ replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L);
+
+ replayExtension.rcv(null, pushTopicMessage);
+
+ ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+ assertEquals(Long.valueOf(1L), dataMap.get("/topic/AccountUpdates"));
+
+ }
+
+ @Test
+ public void shouldNotUpdateReplayIdFromMessageWhenIsNotSupported()
+ throws NoSuchFieldException, IllegalAccessException {
+ final Message.Mutable pushTopicMessage = createPushTopicMessage(true);
+
+ final ReplayExtension replayExtension = new ReplayExtension();
+ replayExtension.rcvMeta(null, createHandshakeMessage(false));
+
+ replayExtension.rcv(null, pushTopicMessage);
+
+ ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension);
+
+ assertEquals(0, dataMap.size());
+ }
+}