You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/12 19:56:27 UTC
[08/11] camel git commit: refactor component and upgrade PubNub
library
refactor component and upgrade PubNub library
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/13042923
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/13042923
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/13042923
Branch: refs/heads/master
Commit: 13042923d6451cfb3e3fa0eb7449aa309b9dce04
Parents: 289a672
Author: Preben Asmussen <pr...@gmail.com>
Authored: Sun Apr 2 14:27:09 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Apr 12 21:56:07 2017 +0200
----------------------------------------------------------------------
components/camel-pubnub/pom.xml | 27 ++-
.../camel/component/pubnub/JsonConverter.java | 46 ----
.../camel/component/pubnub/PubNubComponent.java | 16 +-
.../component/pubnub/PubNubConfiguration.java | 61 +++--
.../camel/component/pubnub/PubNubConstants.java | 10 +-
.../camel/component/pubnub/PubNubConsumer.java | 89 ++++---
.../camel/component/pubnub/PubNubEndpoint.java | 163 ++-----------
.../component/pubnub/PubNubEndpointType.java | 32 ---
.../camel/component/pubnub/PubNubProducer.java | 242 ++++++++++++++-----
.../services/org/apache/camel/TypeConverter | 18 --
.../component/pubnub/PubNubComponentTest.java | 61 -----
.../pubnub/PubNubConfigurationTest.java | 29 +--
.../pubnub/PubNubEmptyPayloadTest.java | 55 -----
.../camel/component/pubnub/PubNubMock.java | 186 --------------
.../component/pubnub/PubNubOperationsTest.java | 110 ++++++---
.../component/pubnub/PubNubPresensTest.java | 50 ++--
.../component/pubnub/PubNubPublishTest.java | 93 +++++++
.../component/pubnub/PubNubSubscriberTest.java | 60 +++++
.../camel/component/pubnub/PubNubTestBase.java | 107 ++++++++
.../pubnub/example/PubNubExampleConstants.java | 4 +-
.../pubnub/example/PubNubOperationsExample.java | 61 -----
.../pubnub/example/PubNubPresenseExample.java | 6 +-
.../pubnub/example/PubNubSensor2Example.html | 143 ++++++-----
.../pubnub/example/PubNubSensor2Example.java | 82 ++++---
.../pubnub/example/PubNubSensorExample.java | 2 +-
.../src/test/resources/log4j2.properties | 2 +-
parent/pom.xml | 2 +-
.../spring-boot/components-starter/pom.xml | 1 +
28 files changed, 840 insertions(+), 918 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/pom.xml b/components/camel-pubnub/pom.xml
index 268cb2a..5c831df 100644
--- a/components/camel-pubnub/pom.xml
+++ b/components/camel-pubnub/pom.xml
@@ -37,7 +37,7 @@
</dependency>
<dependency>
<groupId>com.pubnub</groupId>
- <artifactId>pubnub</artifactId>
+ <artifactId>pubnub-gson</artifactId>
<version>${pubnub-version}</version>
</dependency>
<dependency>
@@ -45,16 +45,24 @@
<artifactId>gson</artifactId>
<version>${gson-version}</version>
</dependency>
+
+ <!-- logging -->
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>${commons-lang3-version}</version>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.json</groupId>
- <artifactId>json</artifactId>
- <version>20160810</version>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
</dependency>
+
<!-- testing -->
<dependency>
<groupId>org.apache.camel</groupId>
@@ -72,8 +80,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-netty-http</artifactId>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock</artifactId>
+ <version>${wiremock-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/JsonConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/JsonConverter.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/JsonConverter.java
deleted file mode 100644
index e5f09a0..0000000
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/JsonConverter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.pubnub;
-
-import org.apache.camel.Converter;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-@Converter
-public final class JsonConverter {
- private JsonConverter() {
- }
-
- @Converter
- public static JSONObject toJsonObject(String json) {
- try {
- return new JSONObject(json);
- } catch (JSONException e) {
- return null;
- }
- }
-
- @Converter
- public static JSONArray toJsonArray(String json) {
- try {
- return new JSONArray(json);
- } catch (JSONException e) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubComponent.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubComponent.java
index 9eb4e61..b35e05d 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubComponent.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubComponent.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
/**
* Represents the component that manages {@link PubNubEndpoint}.
@@ -37,17 +38,12 @@ public class PubNubComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- String[] uriParts = remaining.split(":");
- if (uriParts.length != 2) {
- throw new IllegalArgumentException("Invalid Endpoint URI: " + uri + ". It should contains a valid endpointType and channel");
- }
- PubNubEndpointType endpointType = PubNubEndpointType.valueOf(uriParts[0]);
- String channel = uriParts[1];
-
- PubNubEndpoint endpoint = new PubNubEndpoint(uri, this);
+ ObjectHelper.notNull(remaining, "channel");
+ PubNubConfiguration pubNubConfiguration = new PubNubConfiguration();
+ pubNubConfiguration.setChannel(remaining);
+ setProperties(pubNubConfiguration, parameters);
+ PubNubEndpoint endpoint = new PubNubEndpoint(uri, this, pubNubConfiguration);
setProperties(endpoint, parameters);
- endpoint.setEndpointType(endpointType);
- endpoint.setChannel(channel);
return endpoint;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConfiguration.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConfiguration.java
index b506508..65342fa 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConfiguration.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConfiguration.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.pubnub;
-import com.pubnub.api.Pubnub;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
@@ -25,12 +24,6 @@ import org.apache.camel.spi.UriPath;
@UriParams
public class PubNubConfiguration {
- @UriParam
- private Pubnub pubnub;
-
- @UriPath(enums = "pubsub,presence")
- @Metadata(required = "true", defaultValue = "pubsub")
- private PubNubEndpointType endpointType = PubNubEndpointType.pubsub;
@UriPath()
@Metadata(required = "true")
@@ -45,24 +38,26 @@ public class PubNubConfiguration {
@UriParam()
private String secretKey;
+ @UriParam()
+ private String authKey;
+
@UriParam(defaultValue = "true")
- private boolean ssl = true;
+ private boolean secure = true;
@UriParam()
private String uuid;
- @UriParam(label = "producer", enums = "HERE_NOW, WHERE_NOW, GET_STATE, SET_STATE, GET_HISTORY, PUBLISH")
+ @UriParam(label = "producer", enums = "HERE_NOW,WHERE_NOW,GET_STATE,SET_STATE,GET_HISTORY,PUBLISH,FIRE")
private String operation;
- public PubNubEndpointType getEndpointType() {
- return endpointType;
- }
+ @UriParam(label = "consumer", defaultValue = "false")
+ private boolean withPresence;
/**
* The publish key obtained from your PubNub account. Required when publishing messages.
*/
public String getPublisherKey() {
- return publisherKey;
+ return this.publisherKey;
}
public void setPublisherKey(String publisherKey) {
@@ -73,7 +68,7 @@ public class PubNubConfiguration {
* The subscribe key obtained from your PubNub account. Required when subscribing to channels or listening for presence events
*/
public String getSubscriberKey() {
- return subscriberKey;
+ return this.subscriberKey;
}
public void setSubscriberKey(String subscriberKey) {
@@ -84,7 +79,7 @@ public class PubNubConfiguration {
* The secret key used for message signing.
*/
public String getSecretKey() {
- return secretKey;
+ return this.secretKey;
}
public void setSecretKey(String secretKey) {
@@ -92,21 +87,32 @@ public class PubNubConfiguration {
}
/**
+ * If Access Manager is utilized, client will use this authKey in all restricted requests.
+ */
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ /**
* Use ssl
*/
- public boolean isSsl() {
- return ssl;
+ public boolean isSecure() {
+ return this.secure;
}
- public void setSsl(boolean ssl) {
- this.ssl = ssl;
+ public void setSecure(boolean secure) {
+ this.secure = secure;
}
/**
* The channel used for subscribing/publishing events
*/
public String getChannel() {
- return channel;
+ return this.channel;
}
public void setChannel(String channel) {
@@ -121,7 +127,7 @@ public class PubNubConfiguration {
}
public String getUuid() {
- return uuid;
+ return this.uuid;
}
/**
@@ -132,15 +138,18 @@ public class PubNubConfiguration {
}
public String getOperation() {
- return operation;
+ return this.operation;
}
- public Pubnub getPubnub() {
- return pubnub;
+ /**
+ * Also subscribe to related presence information
+ */
+ public void setWithPresence(boolean withPresence) {
+ this.withPresence = withPresence;
}
- public void setPubnub(Pubnub pubnub) {
- this.pubnub = pubnub;
+ public boolean withPresence() {
+ return withPresence;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConstants.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConstants.java
index 30adae0..9bfdb58 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConstants.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConstants.java
@@ -16,9 +16,9 @@
*/
package org.apache.camel.component.pubnub;
-public interface PubNubConstants {
- String OPERATION = "CamelPubNubOperation";
- String TIMETOKEN = "CamelPubNubTimeToken";
- String CHANNEL = "CamelPubNubChannel";
- String UUID = "CamelPubNubUUID";
+public abstract class PubNubConstants {
+ public static String OPERATION = "CamelPubNubOperation";
+ public static String TIMETOKEN = "CamelPubNubTimeToken";
+ public static String CHANNEL = "CamelPubNubChannel";
+ public static String UUID = "CamelPubNubUUID";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
index 8b8002a..2ae6529 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubConsumer.java
@@ -16,8 +16,14 @@
*/
package org.apache.camel.component.pubnub;
-import com.pubnub.api.Callback;
-import com.pubnub.api.PubnubError;
+
+import java.util.Arrays;
+
+import com.pubnub.api.PubNub;
+import com.pubnub.api.callbacks.SubscribeCallback;
+import com.pubnub.api.models.consumer.PNStatus;
+import com.pubnub.api.models.consumer.pubsub.PNMessageResult;
+import com.pubnub.api.models.consumer.pubsub.PNPresenceEventResult;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -27,31 +33,37 @@ import org.apache.camel.impl.DefaultExchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * The PubNub consumer.
- */
+import static com.pubnub.api.enums.PNStatusCategory.PNTimeoutCategory;
+import static com.pubnub.api.enums.PNStatusCategory.PNUnexpectedDisconnectCategory;
+
+import static org.apache.camel.component.pubnub.PubNubConstants.CHANNEL;
+import static org.apache.camel.component.pubnub.PubNubConstants.TIMETOKEN;
+
public class PubNubConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(PubNubConsumer.class);
private final PubNubEndpoint endpoint;
+ private final PubNubConfiguration pubNubConfiguration;
- public PubNubConsumer(PubNubEndpoint endpoint, Processor processor) {
+ public PubNubConsumer(PubNubEndpoint endpoint, Processor processor, PubNubConfiguration pubNubConfiguration) {
super(endpoint, processor);
this.endpoint = endpoint;
+ this.pubNubConfiguration = pubNubConfiguration;
}
private void initCommunication() throws Exception {
- if (endpoint.getEndpointType().equals(PubNubEndpointType.pubsub)) {
- endpoint.getPubnub().subscribe(endpoint.getChannel(), new PubNubCallback());
+ endpoint.getPubnub().addListener(new PubNubCallback());
+ if (pubNubConfiguration.withPresence()) {
+ endpoint.getPubnub().subscribe().channels(Arrays.asList(pubNubConfiguration.getChannel())).withPresence().execute();
} else {
- endpoint.getPubnub().presence(endpoint.getChannel(), new PubNubCallback());
+ endpoint.getPubnub().subscribe().channels(Arrays.asList(pubNubConfiguration.getChannel())).execute();
}
}
- private void terminateCommunication() throws Exception {
- if (endpoint.getEndpointType().equals(PubNubEndpointType.pubsub)) {
- endpoint.getPubnub().unsubscribe(endpoint.getChannel());
- } else {
- endpoint.getPubnub().unsubscribePresence(endpoint.getChannel());
+ private void terminateCommunication() {
+ try {
+ endpoint.getPubnub().unsubscribe().channels(Arrays.asList(pubNubConfiguration.getChannel())).execute();
+ } catch (Exception e) {
+ // ignore
}
}
@@ -79,15 +91,25 @@ public class PubNubConsumer extends DefaultConsumer {
super.doSuspend();
}
- private class PubNubCallback extends Callback {
+ class PubNubCallback extends SubscribeCallback {
@Override
- public void successCallback(String channel, Object objectMessage, String timetoken) {
+ public void status(PubNub pubnub, PNStatus status) {
+ if (status.getCategory() == PNUnexpectedDisconnectCategory || status.getCategory() == PNTimeoutCategory) {
+ LOG.trace("Got status : {}. Reconnecting to PubNub", status);
+ pubnub.reconnect();
+ } else {
+ LOG.trace("Status message : {}", status);
+ }
+ }
+
+ @Override
+ public void message(PubNub pubnub, PNMessageResult message) {
Exchange exchange = new DefaultExchange(endpoint, endpoint.getExchangePattern());
- Message message = exchange.getIn();
- message.setBody(objectMessage);
- message.setHeader(PubNubConstants.TIMETOKEN, timetoken);
- message.setHeader(PubNubConstants.CHANNEL, channel);
+ Message inmessage = exchange.getIn();
+ inmessage.setBody(message);
+ inmessage.setHeader(TIMETOKEN, message.getTimetoken());
+ inmessage.setHeader(CHANNEL, message.getChannel());
try {
getProcessor().process(exchange);
} catch (Exception e) {
@@ -97,24 +119,21 @@ public class PubNubConsumer extends DefaultConsumer {
}
@Override
- public void connectCallback(String channel, Object message) {
- LOG.info("Subscriber : Successfully connected to PubNub channel {}", channel);
- }
-
- @Override
- public void errorCallback(String channel, PubnubError error) {
- LOG.error("Subscriber : Error [{}] received from PubNub on channel {}", error, channel);
- }
+ public void presence(PubNub pubnub, PNPresenceEventResult presence) {
+ Exchange exchange = new DefaultExchange(endpoint, endpoint.getExchangePattern());
+ Message inmessage = exchange.getIn();
+ inmessage.setBody(presence);
+ inmessage.setHeader(TIMETOKEN, presence.getTimetoken());
+ inmessage.setHeader(CHANNEL, presence.getChannel());
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ }
- @Override
- public void reconnectCallback(String channel, Object message) {
- LOG.info("Subscriber : Reconnected to PubNub channel {}", channel);
}
- @Override
- public void disconnectCallback(String channel, Object message) {
- LOG.trace("Subscriber : Disconnected from PubNub channel {}", channel);
- }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpoint.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpoint.java
index 2f60d2a..a58b41b 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpoint.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpoint.java
@@ -16,64 +16,42 @@
*/
package org.apache.camel.component.pubnub;
-import com.pubnub.api.Pubnub;
+
+import com.pubnub.api.PNConfiguration;
+import com.pubnub.api.PubNub;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
import org.apache.camel.util.ObjectHelper;
@UriEndpoint(scheme = "pubnub", title = "PubNub", syntax = "pubnub:endpointType:channel", consumerClass = PubNubConsumer.class, label = "cloud,iot,messaging")
public class PubNubEndpoint extends DefaultEndpoint {
@UriParam
- private Pubnub pubnub;
-
- @UriPath(enums = "pubsub,presence")
- @Metadata(required = "true")
- private PubNubEndpointType endpointType = PubNubEndpointType.pubsub;
-
- @UriPath()
- @Metadata(required = "true")
- private String channel;
-
- @UriParam()
- private String publisherKey;
-
- @UriParam()
- private String subscriberKey;
-
- @UriParam()
- private String secretKey;
-
- @UriParam(defaultValue = "true")
- private boolean ssl = true;
-
- @UriParam()
- private String uuid;
+ private PubNub pubnub;
- @UriParam(label = "producer", enums = "HERE_NOW, WHERE_NOW, GET_STATE, SET_STATE, GET_HISTORY, PUBLISH")
- private String operation;
+ @UriParam
+ private PubNubConfiguration configuration;
- public PubNubEndpoint(String uri, PubNubComponent component) {
+ public PubNubEndpoint(String uri, PubNubComponent component, PubNubConfiguration configuration) {
super(uri, component);
+ this.configuration = configuration;
}
@Override
public Producer createProducer() throws Exception {
- return new PubNubProducer(this);
+ return new PubNubProducer(this, configuration);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new PubNubConsumer(this, processor);
+ return new PubNubConsumer(this, processor, configuration);
}
@Override
@@ -81,106 +59,19 @@ public class PubNubEndpoint extends DefaultEndpoint {
return true;
}
- /**
- * The type endpoint type. Either pubsub or presence
- */
-
- public PubNubEndpointType getEndpointType() {
- return endpointType;
- }
-
- public void setEndpointType(PubNubEndpointType endpointType) {
- this.endpointType = endpointType;
- }
-
- /**
- * The pubnub publish key obtained from your pubnub account. Required when
- * publishing messages.
- */
- public String getPublisherKey() {
- return publisherKey;
- }
-
- public void setPublisherKey(String publisherKey) {
- this.publisherKey = publisherKey;
- }
-
- /**
- * The pubnub subscribe key obtained from your pubnub account. Required when
- * subscribing to channels or listening for presence events
- */
- public String getSubscriberKey() {
- return subscriberKey;
- }
-
- public void setSubscriberKey(String subscriberKey) {
- this.subscriberKey = subscriberKey;
- }
-
- /**
- * The pubnub secret key used for message signing.
- */
- public String getSecretKey() {
- return secretKey;
- }
-
- public void setSecretKey(String secretKey) {
- this.secretKey = secretKey;
- }
-
- /**
- * Use ssl
- */
- public boolean isSsl() {
- return ssl;
- }
-
- public void setSsl(boolean ssl) {
- this.ssl = ssl;
- }
-
- /**
- * The channel used for subscribing/publishing events
- */
- public String getChannel() {
- return channel;
- }
-
- public void setChannel(String channel) {
- this.channel = channel;
- }
-
- /**
- * The uuid identifying the connection. Will be auto assigned if not set.
- */
- public void setUuid(String uuid) {
- this.uuid = uuid;
- }
-
- public String getUuid() {
- return uuid;
- }
-
- /**
- * The operation to perform.
- */
- public void setOperation(String operation) {
- this.operation = operation;
- }
-
- public String getOperation() {
- return operation;
+ public PubNubConfiguration getConfiguration() {
+ return configuration;
}
/**
* Reference to a Pubnub client in the registry.
*/
- public Pubnub getPubnub() {
+ public PubNub getPubnub() {
return pubnub;
}
- public void setPubnub(Pubnub pubnub) {
+ public void setPubnub(PubNub pubnub) {
this.pubnub = pubnub;
}
@@ -188,7 +79,7 @@ public class PubNubEndpoint extends DefaultEndpoint {
protected void doStop() throws Exception {
super.doStop();
if (pubnub != null) {
- pubnub.shutdown();
+ pubnub.destroy();
pubnub = null;
}
}
@@ -199,20 +90,18 @@ public class PubNubEndpoint extends DefaultEndpoint {
super.doStart();
}
- private Pubnub getInstance() {
- Pubnub answer = null;
- if (ObjectHelper.isNotEmpty(getSecretKey())) {
- answer = new Pubnub(getPublisherKey(), getSubscriberKey(), getSecretKey(), isSsl());
- } else {
- answer = new Pubnub(getPublisherKey(), getSubscriberKey(), isSsl());
- }
- if (ObjectHelper.isNotEmpty(getUuid())) {
- answer.setUUID(getUuid());
- } else {
- String autoUUID = answer.uuid();
- setUuid(autoUUID);
- answer.setUUID(autoUUID);
+ private PubNub getInstance() {
+ PubNub answer = null;
+ PNConfiguration pnConfiguration = new PNConfiguration();
+ pnConfiguration.setPublishKey(configuration.getPublisherKey());
+ pnConfiguration.setSubscribeKey(configuration.getSubscriberKey());
+ pnConfiguration.setSecretKey(configuration.getSecretKey());
+ pnConfiguration.setAuthKey(configuration.getAuthKey());
+ pnConfiguration.setSecure(configuration.isSecure());
+ if (ObjectHelper.isNotEmpty(configuration.getUuid())) {
+ pnConfiguration.setUuid(configuration.getUuid());
}
+ answer = new PubNub(pnConfiguration);
return answer;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpointType.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpointType.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpointType.java
deleted file mode 100644
index 1454b33..0000000
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubEndpointType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.pubnub;
-
-public enum PubNubEndpointType {
- pubsub("pubsub"), presence("presence");
-
- private final String text;
-
- PubNubEndpointType(final String text) {
- this.text = text;
- }
-
- @Override
- public String toString() {
- return text;
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java
index c2fb85c..29665e2 100644
--- a/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java
+++ b/components/camel-pubnub/src/main/java/org/apache/camel/component/pubnub/PubNubProducer.java
@@ -16,17 +16,22 @@
*/
package org.apache.camel.component.pubnub;
-import com.pubnub.api.Callback;
-import com.pubnub.api.PubnubError;
+import java.util.Arrays;
+
+import com.pubnub.api.callbacks.PNCallback;
+import com.pubnub.api.models.consumer.PNPublishResult;
+import com.pubnub.api.models.consumer.PNStatus;
+import com.pubnub.api.models.consumer.history.PNHistoryResult;
+import com.pubnub.api.models.consumer.presence.PNGetStateResult;
+import com.pubnub.api.models.consumer.presence.PNHereNowResult;
+import com.pubnub.api.models.consumer.presence.PNSetStateResult;
+import com.pubnub.api.models.consumer.presence.PNWhereNowResult;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
-import org.apache.camel.InvalidPayloadException;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
-import org.json.JSONArray;
-import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,72 +41,48 @@ import org.slf4j.LoggerFactory;
public class PubNubProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(PubNubProducer.class);
private final PubNubEndpoint endpoint;
+ private final PubNubConfiguration pubnubConfiguration;
- public PubNubProducer(PubNubEndpoint endpoint) {
+ public PubNubProducer(PubNubEndpoint endpoint, PubNubConfiguration pubNubConfiguration) {
super(endpoint);
this.endpoint = endpoint;
+ this.pubnubConfiguration = pubNubConfiguration;
}
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
- Callback pubnubCallback = pubnubCallback(exchange, callback);
Operation operation = getOperation(exchange);
- LOG.trace("Executing {} operation", operation);
+
+ LOG.debug("Executing {} operation", operation);
+
switch (operation) {
case PUBLISH: {
- String channel = exchange.getIn().getHeader(PubNubConstants.CHANNEL, String.class);
- channel = channel != null ? channel : endpoint.getChannel();
- Object body = exchange.getIn().getBody();
- if (ObjectHelper.isEmpty(body)) {
- exchange.setException(new CamelException("Can not publish empty message"));
- callback.done(true);
- return true;
- }
- LOG.trace("Sending message [{}] to channel [{}]", body, channel);
- if (body.getClass().isAssignableFrom(JSONObject.class)) {
- endpoint.getPubnub().publish(channel, (JSONObject)body, pubnubCallback);
- } else if (body.getClass().isAssignableFrom(JSONArray.class)) {
- endpoint.getPubnub().publish(channel, (JSONArray)body, pubnubCallback);
- } else {
- try {
- endpoint.getPubnub().publish(channel, exchange.getIn().getMandatoryBody(String.class), pubnubCallback);
- } catch (InvalidPayloadException e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
- }
+ doPublish(exchange, callback);
+ break;
+ }
+ case FIRE: {
+ doFire(exchange, callback);
break;
}
case GET_HISTORY: {
- endpoint.getPubnub().history(endpoint.getChannel(), false, pubnubCallback);
+ doGetHistory(exchange, callback);
break;
}
case GET_STATE: {
- String uuid = exchange.getIn().getHeader(PubNubConstants.UUID, String.class);
- endpoint.getPubnub().getState(endpoint.getChannel(), uuid != null ? uuid : endpoint.getUuid(), pubnubCallback);
+ doGetState(exchange, callback);
break;
}
case HERE_NOW: {
- endpoint.getPubnub().hereNow(endpoint.getChannel(), true, true, pubnubCallback);
+ doHereNow(exchange, callback);
break;
}
case SET_STATE: {
- try {
- JSONObject state = exchange.getIn().getMandatoryBody(JSONObject.class);
- String uuid = exchange.getIn().getHeader(PubNubConstants.UUID, String.class);
- endpoint.getPubnub().setState(endpoint.getChannel(), uuid != null ? uuid : endpoint.getUuid(), state, pubnubCallback);
- } catch (InvalidPayloadException e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
+ doSetState(exchange, callback);
break;
}
case WHERE_NOW: {
- String uuid = exchange.getIn().getHeader(PubNubConstants.UUID, String.class);
- endpoint.getPubnub().whereNow(uuid != null ? uuid : endpoint.getUuid(), pubnubCallback);
+ doWhereNow(exchange, callback);
break;
}
default:
@@ -110,37 +91,170 @@ public class PubNubProducer extends DefaultAsyncProducer {
return false;
}
- private Callback pubnubCallback(final Exchange exchange, final AsyncCallback callback) {
- Callback pubnubCallback = new Callback() {
- @Override
- public void successCallback(String channel, Object message) {
- LOG.trace("PubNub response {}", message);
- exchange.getIn().setHeader(PubNubConstants.CHANNEL, channel);
- if (exchange.getPattern().isOutCapable()) {
- exchange.getOut().copyFrom(exchange.getIn());
- exchange.getOut().setBody(message);
+
+ private void doPublish(Exchange exchange, AsyncCallback callback) {
+ Object body = exchange.getIn().getBody();
+ if (ObjectHelper.isEmpty(body)) {
+ exchange.setException(new CamelException("Can not publish empty message"));
+ callback.done(true);
+ }
+ LOG.debug("Sending message [{}] to channel [{}]", body, getChannel(exchange));
+ endpoint.getPubnub()
+ .publish()
+ .message(body)
+ .channel(getChannel(exchange))
+ .usePOST(true)
+ .async(new PNCallback<PNPublishResult>() {
+ @Override
+ public void onResponse(PNPublishResult result, PNStatus status) {
+ if (!status.isError()) {
+ exchange.getIn().setHeader(PubNubConstants.TIMETOKEN, result.getTimetoken());
+ }
+ processMessage(exchange, callback, status, null);
}
- callback.done(false);
- }
+ });
+ }
+
+ private void doFire(Exchange exchange, AsyncCallback callback) {
+ Object body = exchange.getIn().getBody();
+ if (ObjectHelper.isEmpty(body)) {
+ exchange.setException(new CamelException("Can not fire empty message"));
+ callback.done(true);
+ }
+ LOG.debug("Sending message [{}] to channel [{}]", body, getChannel(exchange));
+ endpoint.getPubnub()
+ .fire()
+ .message(body)
+ .channel(getChannel(exchange))
+ .async(new PNCallback<PNPublishResult>() {
+ @Override
+ public void onResponse(PNPublishResult result, PNStatus status) {
+ if (!status.isError()) {
+ exchange.getIn().setHeader(PubNubConstants.TIMETOKEN, result.getTimetoken());
+ }
+ processMessage(exchange, callback, status, null);
+ }
+ });
+ }
+
+ private void doGetHistory(Exchange exchange, AsyncCallback callback) {
+ // @formatter:off
+ endpoint.getPubnub()
+ .history()
+ .channel(getChannel(exchange))
+ .async(new PNCallback<PNHistoryResult>() {
+ @Override
+ public void onResponse(PNHistoryResult result, PNStatus status) {
+ LOG.debug("Got history message [{}]", result);
+ processMessage(exchange, callback, status, result.getMessages());
+ }
+ });
+ // @formatter:on
+ }
+
+ private void doSetState(Exchange exchange, AsyncCallback callback) {
+ Object body = exchange.getIn().getBody();
+ if (ObjectHelper.isEmpty(body)) {
+ exchange.setException(new CamelException("Can not publish empty message"));
+ callback.done(true);
+ }
+ LOG.debug("Sending setState [{}] to channel [{}]", body, getChannel(exchange));
+ endpoint.getPubnub()
+ .setPresenceState()
+ .channels(Arrays.asList(getChannel(exchange)))
+ .state(body)
+ .uuid(getUUID(exchange))
+ .async(new PNCallback<PNSetStateResult>() {
+ public void onResponse(PNSetStateResult result, PNStatus status) {
+ LOG.debug("Got setState responsee [{}]", result);
+ processMessage(exchange, callback, status, result);
+ };
+ });
+ }
+
+ private void doGetState(Exchange exchange, AsyncCallback callback) {
+ // @formatter:off
+ endpoint.getPubnub()
+ .getPresenceState()
+ .channels(Arrays.asList(getChannel(exchange)))
+ .uuid(getUUID(exchange))
+ .async(new PNCallback<PNGetStateResult>() {
+ @Override
+ public void onResponse(PNGetStateResult result, PNStatus status) {
+ LOG.debug("Got state [{}]", result.getStateByUUID());
+ processMessage(exchange, callback, status, result.getStateByUUID());
+ }
+ });
+ // @formatter:on
+ }
+
+ private void doHereNow(Exchange exchange, AsyncCallback callback) {
+ endpoint.getPubnub()
+ .hereNow()
+ .channels(Arrays.asList(getChannel(exchange)))
+ .includeState(true)
+ .includeUUIDs(true)
+ .async(new PNCallback<PNHereNowResult>() {
+ @Override
+ public void onResponse(PNHereNowResult result, PNStatus status) {
+ LOG.debug("Got herNow message [{}]", result);
+ processMessage(exchange, callback, status, result);
+ }
+ });
+ }
- @Override
- public void errorCallback(String channel, PubnubError error) {
- exchange.setException(new CamelException(error.toString()));
- callback.done(false);
+ private void doWhereNow(Exchange exchange, AsyncCallback callback) {
+ // @formatter:off
+ endpoint.getPubnub()
+ .whereNow()
+ .uuid(getUUID(exchange))
+ .async(new PNCallback<PNWhereNowResult>() {
+ @Override
+ public void onResponse(PNWhereNowResult result, PNStatus status) {
+ LOG.debug("Got whereNow message [{}]", result.getChannels());
+ processMessage(exchange, callback, status, result.getChannels());
+ };
+ });
+ // @formatter:on
+ }
+
+ private void processMessage(Exchange exchange, AsyncCallback callback, PNStatus status, Object body) {
+ if (status.isError()) {
+ exchange.setException(status.getErrorData().getThrowable());
+ callback.done(true);
+ } else if (body != null) {
+ exchange.getIn().setBody(body);
+ }
+ if (exchange.getPattern().isOutCapable()) {
+ exchange.getOut().copyFrom(exchange.getIn());
+ if (body != null) {
+ exchange.getOut().setBody(body);
}
- };
- return pubnubCallback;
+ }
+
+ // signal exchange completion
+ callback.done(false);
}
private Operation getOperation(Exchange exchange) {
String operation = exchange.getIn().getHeader(PubNubConstants.OPERATION, String.class);
if (operation == null) {
- operation = endpoint.getOperation();
+ operation = pubnubConfiguration.getOperation();
}
return operation != null ? Operation.valueOf(operation) : Operation.PUBLISH;
}
+ private String getChannel(Exchange exchange) {
+ String channel = exchange.getIn().getHeader(PubNubConstants.CHANNEL, String.class);
+ return channel != null ? channel : pubnubConfiguration.getChannel();
+ }
+
+ private String getUUID(Exchange exchange) {
+ String uuid = exchange.getIn().getHeader(PubNubConstants.UUID, String.class);
+ return uuid != null ? uuid : pubnubConfiguration.getUuid();
+ }
+
private enum Operation {
- HERE_NOW, WHERE_NOW, GET_STATE, SET_STATE, GET_HISTORY, PUBLISH;
+ HERE_NOW, WHERE_NOW, GET_STATE, SET_STATE, GET_HISTORY, PUBLISH, FIRE;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-pubnub/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
deleted file mode 100644
index ebdd694..0000000
--- a/components/camel-pubnub/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.camel.component.pubnub
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubComponentTest.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubComponentTest.java
deleted file mode 100644
index 664bafe..0000000
--- a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubComponentTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.pubnub;
-
-import org.apache.camel.EndpointInject;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.json.JSONObject;
-import org.junit.Test;
-
-public class PubNubComponentTest extends CamelTestSupport {
- private String endpoint = "pubnub:pubsub:someChannel?pubnub=#pubnub";
-
- @EndpointInject(uri = "mock:result")
- private MockEndpoint mockResult;
-
- @Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry registry = super.createRegistry();
- registry.bind("pubnub", new PubNubMock("dummy", "dummy"));
- return registry;
- }
-
- @Test
- public void testPubNub() throws Exception {
- mockResult.expectedMessageCount(1);
- mockResult.expectedHeaderReceived("CamelPubNubChannel", "someChannel");
- mockResult.expectedBodiesReceived("{\"hi\":\"there\"}");
- JSONObject jo = new JSONObject();
- jo.put("hi", "there");
- template.sendBody("direct:publish", jo);
- assertMockEndpointsSatisfied();
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() {
- from(endpoint).to("mock:result");
- from("direct:publish").to(endpoint);
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubConfigurationTest.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubConfigurationTest.java
index 23e5c56..20ddd24 100644
--- a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubConfigurationTest.java
+++ b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubConfigurationTest.java
@@ -24,32 +24,33 @@ public class PubNubConfigurationTest extends CamelTestSupport {
@Test(expected = IllegalArgumentException.class)
public void createEndpointWithIllegalArguments() throws Exception {
PubNubComponent component = new PubNubComponent(context);
- component.createEndpoint("pubnub:XXX:xxx");
+ component.createEndpoint("pubnub");
}
@Test
public void createEndpointWithMinimalConfiguration() throws Exception {
PubNubComponent component = new PubNubComponent(context);
- PubNubEndpoint endpoint = (PubNubEndpoint) component.createEndpoint("pubnub://pubsub:xxx?subscriberKey=mysubkey");
+ PubNubEndpoint endpoint = (PubNubEndpoint) component.createEndpoint("pubnub:xxx?subscriberKey=mysubkey");
- assertEquals("xxx", endpoint.getChannel());
- assertEquals("mysubkey", endpoint.getSubscriberKey());
- assertTrue(endpoint.isSsl());
+ assertEquals("xxx", endpoint.getConfiguration().getChannel());
+ assertEquals("mysubkey", endpoint.getConfiguration().getSubscriberKey());
+ assertTrue(endpoint.getConfiguration().isSecure());
}
@Test
public void createEndpointWithMaximalConfiguration() throws Exception {
PubNubComponent component = new PubNubComponent(context);
PubNubEndpoint endpoint = (PubNubEndpoint)component
- .createEndpoint("pubnub://pubsub:xxx?subscriberKey=mysubkey&publisherKey=mypubkey&secretKey=secrets&uuid=myuuid&operation=PUBLISH&ssl=false");
-
- assertEquals("xxx", endpoint.getChannel());
- assertEquals("mysubkey", endpoint.getSubscriberKey());
- assertEquals("mypubkey", endpoint.getPublisherKey());
- assertEquals("secrets", endpoint.getSecretKey());
- assertEquals("myuuid", endpoint.getUuid());
- assertEquals("PUBLISH", endpoint.getOperation());
- assertFalse(endpoint.isSsl());
+ .createEndpoint("pubnub:xxx?subscriberKey=mysubkey&publisherKey=mypubkey&secretKey=secrets&uuid=myuuid&operation=PUBLISH&secure=false&authKey=authKey");
+
+ assertEquals("xxx", endpoint.getConfiguration().getChannel());
+ assertEquals("mysubkey", endpoint.getConfiguration().getSubscriberKey());
+ assertEquals("mypubkey", endpoint.getConfiguration().getPublisherKey());
+ assertEquals("secrets", endpoint.getConfiguration().getSecretKey());
+ assertEquals("myuuid", endpoint.getConfiguration().getUuid());
+ assertEquals("PUBLISH", endpoint.getConfiguration().getOperation());
+ assertEquals("authKey", endpoint.getConfiguration().getAuthKey());
+ assertFalse(endpoint.getConfiguration().isSecure());
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubEmptyPayloadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubEmptyPayloadTest.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubEmptyPayloadTest.java
deleted file mode 100644
index d641259..0000000
--- a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubEmptyPayloadTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.pubnub;
-
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-
-public class PubNubEmptyPayloadTest extends CamelTestSupport {
- private final String endpoint = "pubnub:pubsub:someChannel?pubnub=#pubnub";
-
- @EndpointInject(uri = "mock:result")
- private MockEndpoint mockResult;
-
- @Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry registry = super.createRegistry();
- registry.bind("pubnub", new PubNubMock("dummy", "dummy"));
- return registry;
- }
-
- @Test(expected = CamelExecutionException.class)
- public void testPubNub() throws Exception {
- template.sendBody("direct:publish", null);
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() {
- from("direct:publish").to(endpoint);
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubMock.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubMock.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubMock.java
deleted file mode 100644
index f7b6917..0000000
--- a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubMock.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.pubnub;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import com.pubnub.api.Callback;
-import com.pubnub.api.Pubnub;
-import com.pubnub.api.PubnubException;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
-
-public class PubNubMock extends Pubnub {
- private static Map<String, Callback> subscribers = new ConcurrentHashMap<String, Callback>();
- private static Map<String, Callback> presenceSubscribers = new ConcurrentHashMap<String, Callback>();
- private static Map<String, JSONObject> stateMap = new ConcurrentHashMap<String, JSONObject>();
- private final ExecutorService executorService = Executors.newFixedThreadPool(3);
-
- public PubNubMock(String publishKey, String subscribeKey) {
- super(publishKey, subscribeKey);
- }
-
- @Override
- public void subscribe(String channel, Callback callback) throws PubnubException {
- subscribers.put(channel, callback);
- executorService.execute(() -> {
- try {
- Thread.sleep(500);
- callback.connectCallback(channel, "OK");
- } catch (InterruptedException e) {
- }
- });
- Callback presenceCallback = presenceSubscribers.get(channel);
- if (presenceCallback != null) {
- executorService.execute(() -> {
- try {
- Thread.sleep(500);
- String presence = "{\"action\":\"join\",\"timestamp\":1431777382,\"uuid\":\"d08f121b-d146-45af-a814-058c1b7d283a\",\"occupancy\":1}";
- presenceCallback.successCallback(channel, new JSONObject(presence), "" + System.currentTimeMillis());
- } catch (Exception e) {
- }
- });
- }
- }
-
- @Override
- public void publish(String channel, JSONObject message, Callback callback) {
- callback.successCallback(channel, "OK");
- Callback clientMockCallback = subscribers.get(channel);
- if (clientMockCallback != null) {
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- clientMockCallback.successCallback(channel, message, "" + System.currentTimeMillis());
- } catch (InterruptedException e) {
- }
- });
- }
- }
-
- @Override
- public void publish(String channel, JSONArray message, Callback callback) {
- callback.successCallback(channel, "OK");
- Callback clientMockCallback = subscribers.get(channel);
- if (clientMockCallback != null) {
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- clientMockCallback.successCallback(channel, message, "" + System.currentTimeMillis());
- } catch (InterruptedException e) {
- }
- });
- }
- }
-
- @Override
- public void publish(String channel, String message, Callback callback) {
- callback.successCallback(channel, "OK");
- Callback clientMockCallback = subscribers.get(channel);
- if (clientMockCallback != null) {
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- clientMockCallback.successCallback(channel, message, "" + System.currentTimeMillis());
- } catch (InterruptedException e) {
- }
- });
- }
- }
-
- @Override
- public void presence(String channel, Callback callback) throws PubnubException {
- presenceSubscribers.put(channel, callback);
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- callback.connectCallback(channel, "OK");
- } catch (InterruptedException e) {
- }
- });
- }
-
- @Override
- public void history(String channel, boolean reverse, Callback callback) {
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- callback.successCallback(channel, new JSONArray("[[\"message1\", \"message2\", \"message3\"],\"Start Time Token\",\"End Time Token\"]"));
- } catch (Exception e) {
- }
- });
- }
-
- @Override
- public void setState(String channel, String uuid, JSONObject state, Callback callback) {
- stateMap.put(uuid, state);
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- callback.successCallback(channel, "OK");
- } catch (Exception e) {
- }
- });
- }
-
- @Override
- public void getState(String channel, String uuid, Callback callback) {
- JSONObject jsonObject = stateMap.get(uuid);
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- callback.successCallback(channel, jsonObject);
- } catch (Exception e) {
- }
- });
- }
-
- @Override
- public void hereNow(String channel, boolean state, boolean uuids, Callback callback) {
-
- executorService.execute(() -> {
- try {
- Thread.sleep(500);
- //@formatter:off
- JSONObject response = new JSONObject("{\"uuids\":[\"76c2c571-9a2b-d074-b4f8-e93e09f49bd\","
- + "\"175c2c67-b2a9-470d-8f4b-1db94f90e39e\", "
- + "\"2c67175c-2a9b-074d-4b8f-90e39e1db94f\"],"
- + "\"occupancy\":3 }");
- //@formatter:on
- callback.successCallback(channel, response);
- } catch (Exception e) {
- }
-
- });
- }
-
- @Override
- public void whereNow(String uuid, Callback callback) {
- executorService.execute(() -> {
- try {
- Thread.sleep(1000);
- callback.successCallback("channel", new JSONObject("{\"channels\":[\"hello_world\"]}"));
- } catch (Exception e) {
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubOperationsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubOperationsTest.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubOperationsTest.java
index c918009..d0406db 100644
--- a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubOperationsTest.java
+++ b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubOperationsTest.java
@@ -16,75 +16,125 @@
*/
package org.apache.camel.component.pubnub;
+
+
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import com.google.gson.JsonObject;
+import com.pubnub.api.models.consumer.history.PNHistoryItemResult;
+import com.pubnub.api.models.consumer.presence.PNHereNowResult;
+import com.pubnub.api.models.consumer.presence.PNSetStateResult;
+
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.json.JSONArray;
-import org.json.JSONObject;
import org.junit.Test;
-public class PubNubOperationsTest extends CamelTestSupport {
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+
+public class PubNubOperationsTest extends PubNubTestBase {
@Test
public void testWhereNow() throws Exception {
+ stubFor(get(urlPathEqualTo("/v2/presence/sub-key/mySubscribeKey/uuid/myUUID"))
+ .willReturn(aResponse().withBody("{\"status\": 200, \"message\": \"OK\", \"payload\": {\"channels\": [\"channel-a\",\"channel-b\"]}, \"service\": \"Presence\"}")));
+
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(PubNubConstants.OPERATION, "WHERE_NOW");
- headers.put(PubNubConstants.UUID, "uuid");
- JSONObject response = template.requestBodyAndHeaders("direct:publish", null, headers, JSONObject.class);
+ headers.put(PubNubConstants.UUID, "myUUID");
+ @SuppressWarnings("unchecked")
+ List<String> response = template.requestBodyAndHeaders("direct:publish", null, headers, List.class);
assertNotNull(response);
- assertEquals("hello_world", response.getJSONArray("channels").getString(0));
+ assertListSize(response, 2);
+ assertEquals("channel-a", response.get(0));
}
@Test
public void testHereNow() throws Exception {
+ stubFor(get(urlPathEqualTo("/v2/presence/sub_key/mySubscribeKey/channel/myChannel")).willReturn(aResponse()
+ .withBody("{\"status\" : 200, \"message\" : \"OK\", \"service\" : \"Presence\", \"uuids\" : [{\"uuid\" : \"myUUID0\"}, {\"state\" : {\"abcd\" : {\"age\" : 15}}, "
+ + "\"uuid\" : \"myUUID1\"}, {\"uuid\" : \"b9eb408c-bcec-4d34-b4c4-fabec057ad0d\"}, {\"state\" : {\"abcd\" : {\"age\" : 15}}, \"uuid\" : \"myUUID2\"},"
+ + " {\"state\" : {\"abcd\" : {\"age\" : 24}}, \"uuid\" : \"myUUID9\"}], \"occupancy\" : 5} Return Occupancy O")));
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(PubNubConstants.OPERATION, "HERE_NOW");
- JSONObject response = template.requestBodyAndHeaders("direct:publish", null, headers, JSONObject.class);
+ PNHereNowResult response = template.requestBodyAndHeaders("direct:publish", null, headers, PNHereNowResult.class);
assertNotNull(response);
- assertEquals(3, response.getInt("occupancy"));
+ assertEquals(5, response.getTotalOccupancy());
}
@Test
public void testGetHistory() throws Exception {
+ List<Object> testArray = new ArrayList<>();
+ List<Object> historyItems = new ArrayList<>();
+
+ Map<String, Object> historyEnvelope1 = new HashMap<>();
+ Map<String, Object> historyItem1 = new HashMap<>();
+ historyItem1.put("a", 11);
+ historyItem1.put("b", 22);
+ historyEnvelope1.put("timetoken", 1111);
+ historyEnvelope1.put("message", historyItem1);
+
+ Map<String, Object> historyEnvelope2 = new HashMap<>();
+ Map<String, Object> historyItem2 = new HashMap<>();
+ historyItem2.put("a", 33);
+ historyItem2.put("b", 44);
+ historyEnvelope2.put("timetoken", 2222);
+ historyEnvelope2.put("message", historyItem2);
+
+ historyItems.add(historyEnvelope1);
+ historyItems.add(historyEnvelope2);
+
+ testArray.add(historyItems);
+ testArray.add(1234);
+ testArray.add(4321);
+
+ stubFor(get(urlPathEqualTo("/v2/history/sub-key/mySubscribeKey/channel/myChannel")).willReturn(aResponse().withBody(getPubnub().getMapper().toJson(testArray))));
+
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(PubNubConstants.OPERATION, "GET_HISTORY");
- JSONArray response = template.requestBodyAndHeaders("direct:publish", null, headers, JSONArray.class);
+ @SuppressWarnings("unchecked")
+ List<PNHistoryItemResult> response = template.requestBodyAndHeaders("direct:publish", null, headers, List.class);
assertNotNull(response);
- assertEquals("message1", response.getJSONArray(0).getString(0));
+ assertListSize(response, 2);
}
@Test
- public void testSetAndGetState() throws Exception {
+ public void testGetState() throws Exception {
+ stubFor(get(urlPathEqualTo("/v2/presence/sub-key/mySubscribeKey/channel/myChannel/uuid/myuuid")).willReturn(aResponse()
+ .withBody("{ \"status\": 200, \"message\": \"OK\", \"payload\": "
+ + "{ \"myChannel\": { \"age\" : 20, \"status\" : \"online\"}, \"ch2\": { \"age\": 100, \"status\": \"offline\" } }, \"service\": \"Presence\"}")));
Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(PubNubConstants.OPERATION, "SET_STATE");
- headers.put(PubNubConstants.UUID, "myuuid");
- JSONObject state = new JSONObject("{\"state\":\"active\", \"lat\":\"55.645499\", \"lon\":\"12.370967\"}");
- template.sendBodyAndHeaders("direct:publish", state, headers);
- headers.replace(PubNubConstants.OPERATION, "GET_STATE");
- JSONObject response = template.requestBodyAndHeaders("direct:publish", null, headers, JSONObject.class);
+ headers.put(PubNubConstants.OPERATION, "GET_STATE");
+ @SuppressWarnings("unchecked")
+ Map<String, JsonObject> response = template.requestBodyAndHeaders("direct:publish", null, headers, Map.class);
assertNotNull(response);
- assertEquals(state, response);
+ assertNotNull(response.get("myChannel"));
}
- @Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry registry = super.createRegistry();
- registry.bind("pubnub", new PubNubMock("dummy", "dummy"));
- return registry;
+ @Test
+ public void testSetState() throws Exception {
+ stubFor(get(urlPathEqualTo("/v2/presence/sub-key/mySubscribeKey/channel/myChannel/uuid/myuuid/data"))
+ .willReturn(aResponse().withBody("{ \"status\": 200, \"message\": \"OK\", \"payload\": { \"age\" : 20, \"status\" : \"online\" }, \"service\": \"Presence\"}")));
+ Map<String, Object> myState = new HashMap<>();
+ myState.put("age", 20);
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(PubNubConstants.OPERATION, "SET_STATE");
+ PNSetStateResult response = template.requestBodyAndHeaders("direct:publish", myState, headers, PNSetStateResult.class);
+ assertNotNull(response);
+ assertNotNull(response.getState());
+ assertEquals(20, response.getState().getAsJsonObject().get("age").getAsInt());
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- //@formatter:off
- from("direct:publish").to("pubnub://pubsub:mychannel?uuid=myuuid&pubnub=#pubnub")
- .to("log:io.rhiot.component.pubnub?showAll=true&multiline=true")
- .to("mock:result");
- //@formatter:on
+ from("direct:publish").to("pubnub:myChannel?uuid=myuuid&pubnub=#pubnub")
+ .to("mock:result");
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPresensTest.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPresensTest.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPresensTest.java
index b794537..252a894 100644
--- a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPresensTest.java
+++ b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPresensTest.java
@@ -16,55 +16,53 @@
*/
package org.apache.camel.component.pubnub;
-import com.pubnub.api.Callback;
+import com.pubnub.api.models.consumer.pubsub.PNPresenceEventResult;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.impl.JndiRegistry;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.json.JSONObject;
import org.junit.Test;
-public class PubNubPresensTest extends CamelTestSupport {
- boolean connected;
- private PubNubMock pubnubMock = new PubNubMock("foo", "bar");
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+
+public class PubNubPresensTest extends PubNubTestBase {
@EndpointInject(uri = "mock:result")
private MockEndpoint mockResult;
@Test
public void testPresens() throws Exception {
+ stubFor(get(urlPathEqualTo("/v2/presence/sub-key/mySubscribeKey/channel/mychannel/heartbeat"))
+ .willReturn(aResponse().withBody("{\"status\": 200, \"message\": \"OK\", \"service\": \"Presence\"}")));
+
+ stubFor(get(urlPathEqualTo("/v2/subscribe/mySubscribeKey/mychannel,mychannel-pnpres/0"))
+ .willReturn(aResponse()
+ .withBody("{\"t\":{\"t\":\"14637536741734954\",\"r\":1},\"m\":[{\"a\":\"4\",\"f\":512,\"p\":{\"t\":\"14637536740940378\",\"r\":1},\"k\":\"demo-36\",\"c\":\"mychannel-pnpres\","
+ + "\"d\":{\"action\": \"join\", \"timestamp\": 1463753674, \"uuid\": \"24c9bb19-1fcd-4c40-a6f1-522a8a1329ef\", \"occupancy\": 3},\"b\":\"mychannel-pnpres\"},"
+ + "{\"a\":\"4\",\"f\":512,\"p\":{\"t\":\"14637536741726901\",\"r\":1},\"k\":\"demo-36\",\"c\":\"mychannel-pnpres\",\"d\":{\"action\": \"state-change\", "
+ + "\"timestamp\": 1463753674, \"data\": {\"state\": \"cool\"}, \"uuid\": \"24c9bb19-1fcd-4c40-a6f1-522a8a1329ef\", \"occupancy\": 3},\"b\":\"mychannel-pnpres\"}]}")));
+ context.startRoute("presence-route");
mockResult.expectedMessageCount(1);
mockResult.expectedHeaderReceived(PubNubConstants.CHANNEL, "mychannel");
- pubnubMock.subscribe("mychannel", new Callback() {
- @Override
- public void connectCallback(String channel, Object message) {
- connected = true;
- }
- });
assertMockEndpointsSatisfied();
- assertTrue(connected);
- JSONObject presenceResponse = mockResult.getReceivedExchanges().get(0).getIn().getBody(JSONObject.class);
- assertEquals("join", presenceResponse.getString("action"));
+ PNPresenceEventResult presence = mockResult.getReceivedExchanges().get(0).getIn().getBody(PNPresenceEventResult.class);
+ assertThat(presence.getEvent(), equalTo("join"));
}
- @Override
- protected JndiRegistry createRegistry() throws Exception {
- JndiRegistry registry = super.createRegistry();
- registry.bind("pubnub", new PubNubMock("dummy", "dummy"));
- return registry;
- }
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() {
- //@formatter:off
- from("pubnub://presence:mychannel?pubnub=#pubnub")
- .to("log:org.apache.camel.component.pubnub?showAll=true&multiline=true")
+ from("pubnub:mychannel?pubnub=#pubnub&withPresence=true").id("presence-route")
+ .autoStartup(false)
.to("mock:result");
- //@formatter:on
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPublishTest.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPublishTest.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPublishTest.java
new file mode 100644
index 0000000..8b3d9f0
--- /dev/null
+++ b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubPublishTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pubnub;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+
+import static org.apache.camel.component.pubnub.PubNubConstants.TIMETOKEN;
+
+public class PubNubPublishTest extends PubNubTestBase {
+ private String endpoint = "pubnub:someChannel?pubnub=#pubnub";
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mockResult;
+
+ @Test
+ public void testPubNub() throws Exception {
+ stubFor(post(urlPathEqualTo("/publish/myPublishKey/mySubscribeKey/0/someChannel/0")).willReturn(aResponse().withStatus(200).withBody("[1,\"Sent\",\"14598111595318003\"]")));
+ mockResult.expectedMessageCount(1);
+ mockResult.expectedHeaderReceived(TIMETOKEN, "14598111595318003");
+ template.sendBody("direct:publish", new Hello("Hi"));
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test(expected = CamelExecutionException.class)
+ public void testPubNubException() throws Exception {
+ stubFor(post(urlPathEqualTo("/publish/myPublishKey/mySubscribeKey/0/someChannel/0")).willReturn(aResponse().withStatus(401).withBody("[0,\"Blabla\",\"14598111595318003\"]")));
+
+ template.sendBody("direct:publish", new Hello("Hi"));
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test(expected = CamelExecutionException.class)
+ public void testPublishEmptyBody() throws Exception {
+ template.sendBody("direct:publish", null);
+ }
+
+ @Test
+ public void testFire() throws Exception {
+ stubFor(get(urlPathEqualTo("/publish/myPublishKey/mySubscribeKey/0/someChannel/0/%22Hi%22"))
+ .willReturn(aResponse().withBody("[1,\"Sent\",\"14598111595318003\"]")));
+ mockResult.expectedMessageCount(1);
+ mockResult.expectedHeaderReceived(TIMETOKEN, "14598111595318003");
+
+ template.sendBodyAndHeader("direct:publish", "Hi", PubNubConstants.OPERATION, "FIRE");
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:publish").to(endpoint).to("mock:result");
+ }
+ };
+ }
+
+ static class Hello {
+ private String message;
+
+ Hello(String message) {
+ this.message = message;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubSubscriberTest.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubSubscriberTest.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubSubscriberTest.java
new file mode 100644
index 0000000..5202401
--- /dev/null
+++ b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubSubscriberTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pubnub;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
+
+
+public class PubNubSubscriberTest extends PubNubTestBase {
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mockResult;
+
+ @Test
+ public void testPubSubMessageSubscribe() throws Exception {
+ stubFor(get(urlPathEqualTo("/v2/subscribe/mySubscribeKey/mychannel/0"))
+ .willReturn(aResponse()
+ .withBody("{\"t\":{\"t\":\"14607577960932487\",\"r\":1},\"m\":[{\"a\":\"4\",\"f\":0,\"i\":\"Publisher-A\",\"p\":{\"t\":\"14607577960925503\",\"r\":1},\"o\":"
+ + "{\"t\":\"14737141991877032\",\"r\":2},\"k\":\"sub-c-4cec9f8e-01fa-11e6-8180-0619f8945a4f\",\"c\":\"mychannel\",\"d\":{\"text\":\"Message\"},\"b\":\"coolChannel\"}]}")));
+ stubFor(get(urlPathEqualTo("/v2/presence/sub-key/mySubscribeKey/channel/mychannel/heartbeat"))
+ .willReturn(aResponse().withBody("{\"status\": 200, \"message\": \"OK\", \"service\": \"Presence\"}")));
+
+ context.startRoute("subroute");
+ mockResult.expectedMessageCount(1);
+ mockResult.expectedHeaderReceived(PubNubConstants.CHANNEL, "mychannel");
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("pubnub:mychannel?pubnub=#pubnub").id("subroute").autoStartup(false)
+ .to("mock:result");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubTestBase.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubTestBase.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubTestBase.java
new file mode 100644
index 0000000..647ca49
--- /dev/null
+++ b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/PubNubTestBase.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pubnub;
+
+import java.io.IOException;
+
+import com.github.tomakehurst.wiremock.junit.WireMockRule;
+import com.pubnub.api.PNConfiguration;
+import com.pubnub.api.PubNub;
+import com.pubnub.api.enums.PNLogVerbosity;
+
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+
+import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+import static com.pubnub.api.enums.PNHeartbeatNotificationOptions.NONE;
+
+public class PubNubTestBase extends CamelTestSupport {
+ private final int port = AvailablePortFinder.getNextAvailable(3344);
+
+ private PubNub pubnub;
+
+ @Rule
+ public WireMockRule wireMockRule = new WireMockRule(options().port(port));
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+ this.pubnub = createPubNubInstance();
+ registry.bind("pubnub", this.pubnub);
+ return registry;
+ }
+
+
+ @Before
+ public void beforeEach() throws IOException {
+ wireMockRule.start();
+ }
+
+ @After
+ public void afterEach() {
+ pubnub.destroy();
+ }
+
+ protected PubNub getPubnub() {
+ return pubnub;
+ }
+
+ private PubNub createPubNubInstance() {
+ PNConfiguration pnConfiguration = new PNConfiguration();
+
+ pnConfiguration.setOrigin("localhost" + ":" + port);
+ pnConfiguration.setSecure(false);
+ pnConfiguration.setSubscribeKey("mySubscribeKey");
+ pnConfiguration.setPublishKey("myPublishKey");
+ pnConfiguration.setUuid("myUUID");
+ pnConfiguration.setLogVerbosity(PNLogVerbosity.NONE);
+ pnConfiguration.setHeartbeatNotificationOptions(NONE);
+ class MockedTimePubNub extends PubNub {
+
+ MockedTimePubNub(PNConfiguration initialConfig) {
+ super(initialConfig);
+ }
+
+ @Override
+ public int getTimestamp() {
+ return 1337;
+ }
+
+ @Override
+ public String getVersion() {
+ return "suchJava";
+ }
+
+ @Override
+ public String getInstanceId() {
+ return "PubNubInstanceId";
+ }
+
+ @Override
+ public String getRequestId() {
+ return "PubNubRequestId";
+ }
+
+ }
+
+ return new MockedTimePubNub(pnConfiguration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/13042923/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/example/PubNubExampleConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/example/PubNubExampleConstants.java b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/example/PubNubExampleConstants.java
index f24c917..4e7209d 100644
--- a/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/example/PubNubExampleConstants.java
+++ b/components/camel-pubnub/src/test/java/org/apache/camel/component/pubnub/example/PubNubExampleConstants.java
@@ -19,6 +19,6 @@ package org.apache.camel.component.pubnub.example;
public interface PubNubExampleConstants {
// replace subscriber+publisher key with one obtained from PubNub.
// http://www.pubnub.com
- String PUBNUB_SUBSCRIBER_KEY = "mysubkey";
- String PUBNUB_PUBLISHER_KEY = "mypubkey";
+ String PUBNUB_SUBSCRIBER_KEY = "subkey";
+ String PUBNUB_PUBLISHER_KEY = "pubkey";
}