You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/04/03 12:20:15 UTC
[3/3] camel git commit: CAMEL-9803: Camel-NATS: Switch to Jnats
client as Java_nats is deprecated
CAMEL-9803: Camel-NATS: Switch to Jnats client as Java_nats is deprecated
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66f0fe84
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66f0fe84
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66f0fe84
Branch: refs/heads/master
Commit: 66f0fe84c5a4dece014660ce4ebaf3e01fac94ec
Parents: 431ee2b
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sun Apr 3 12:02:38 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sun Apr 3 12:19:54 2016 +0200
----------------------------------------------------------------------
components/camel-nats/pom.xml | 6 +-
.../camel/component/nats/NatsConfiguration.java | 12 +--
.../camel/component/nats/NatsConsumer.java | 78 ++++++++++++++------
.../camel/component/nats/NatsProducer.java | 12 ++-
.../component/nats/NatsPropertiesConstants.java | 16 ++--
.../component/nats/NatsConsumerLoadTest.java | 14 ++--
.../nats/NatsConsumerMaxMessagesQueueTest.java | 2 +-
.../nats/NatsConsumerMaxMessagesTest.java | 2 +-
.../camel/component/nats/NatsConsumerTest.java | 2 +-
.../camel/component/nats/NatsProducerTest.java | 1 +
parent/pom.xml | 3 +-
.../features/src/main/resources/features.xml | 2 +-
12 files changed, 96 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-nats/pom.xml b/components/camel-nats/pom.xml
index 6ca8079..c2fd1dc 100644
--- a/components/camel-nats/pom.xml
+++ b/components/camel-nats/pom.xml
@@ -34,9 +34,9 @@
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
- <groupId>com.github.tyagihas</groupId>
- <artifactId>java_nats</artifactId>
- <version>${java-nats-version}</version>
+ <groupId>io.nats</groupId>
+ <artifactId>jnats</artifactId>
+ <version>${jnats-version}</version>
</dependency>
<!-- testing -->
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 260d1a7..1618eb9 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -218,12 +218,12 @@ public class NatsConfiguration {
return props;
}
- public Properties createSubProperties() {
- Properties props = new Properties();
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
- return props;
- }
+// public Properties createSubProperties() {
+// Properties props = new Properties();
+// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
+// addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
+// return props;
+// }
private String splitServers() {
StringBuilder servers = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 9c8a29d..8be0aea 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -19,15 +19,21 @@ package org.apache.camel.component.nats;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
-import org.nats.Connection;
-import org.nats.MsgHandler;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
+import io.nats.client.Message;
+import io.nats.client.MessageHandler;
+import io.nats.client.Subscription;
+
public class NatsConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class);
@@ -35,7 +41,7 @@ public class NatsConsumer extends DefaultConsumer {
private final Processor processor;
private ExecutorService executor;
private Connection connection;
- private int sid;
+ private Subscription sid;
public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -67,7 +73,7 @@ public class NatsConsumer extends DefaultConsumer {
connection.flush();
try {
- connection.unsubscribe(sid);
+ sid.unsubscribe();
} catch (Exception e) {
getExceptionHandler().handleException("Error during unsubscribing", e);
}
@@ -83,14 +89,15 @@ public class NatsConsumer extends DefaultConsumer {
executor = null;
LOG.debug("Closing Nats Connection");
- if (connection.isConnected()) {
+ if (!connection.isClosed()) {
connection.close();
}
}
- private Connection getConnection() throws IOException, InterruptedException {
+ private Connection getConnection() throws IOException, InterruptedException, TimeoutException {
Properties prop = getEndpoint().getNatsConfiguration().createProperties();
- connection = Connection.connect(prop);
+ ConnectionFactory factory = new ConnectionFactory(prop);
+ connection = factory.createConnection();
return connection;
}
@@ -107,23 +114,50 @@ public class NatsConsumer extends DefaultConsumer {
@Override
public void run() {
try {
- sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), configuration.createSubProperties(), new MsgHandler() {
- public void execute(String msg) {
- LOG.debug("Received Message: {}", msg);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(msg);
- exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
- exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
- try {
- processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during processing", exchange, e);
+ if (ObjectHelper.isNotEmpty(configuration.getQueueName())) {
+ sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() {
+
+ @Override
+ public void onMessage(Message msg) {
+ LOG.debug("Received Message: {}", msg);
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(msg);
+ exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
+ exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
}
+ });
+ if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
+ sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
}
- });
- } catch (Throwable e) {
- getExceptionHandler().handleException("Error during processing", e);
- }
+ } else {
+ sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() {
+
+ @Override
+ public void onMessage(Message msg) {
+ LOG.debug("Received Message: {}", msg);
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(msg);
+ exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
+ exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+ }
+ });
+ if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
+ sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
+ }
+ }
+ } catch (Throwable e) {
+ getExceptionHandler().handleException("Error during processing", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 89b2b23..2e92f44 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -18,13 +18,16 @@ package org.apache.camel.component.nats;
import java.io.IOException;
import java.util.Properties;
+import java.util.concurrent.TimeoutException;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
-import org.nats.Connection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
+
public class NatsProducer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);
@@ -64,14 +67,15 @@ public class NatsProducer extends DefaultProducer {
LOG.debug("Stopping Nats Producer");
LOG.debug("Closing Nats Connection");
- if (connection != null && connection.isConnected()) {
+ if (connection != null && !connection.isClosed()) {
connection.close();
}
}
- private Connection getConnection() throws IOException, InterruptedException {
+ private Connection getConnection() throws TimeoutException, IOException {
Properties prop = getEndpoint().getNatsConfiguration().createProperties();
- connection = Connection.connect(prop);
+ ConnectionFactory factory = new ConnectionFactory(prop);
+ connection = factory.createConnection();
return connection;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
index 8c09ce8..2e09361 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
@@ -18,14 +18,14 @@ package org.apache.camel.component.nats;
public interface NatsPropertiesConstants {
String NATS_PROPERTY_URI = "uri";
- String NATS_PROPERTY_VERBOSE = "verbose";
- String NATS_PROPERTY_PEDANTIC = "pedantic";
- String NATS_PROPERTY_RECONNECT = "reconnect";
- String NATS_PROPERTY_SSL = "ssl";
- String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "max_reconnect_attempts";
- String NATS_PROPERTY_RECONNECT_TIME_WAIT = "reconnect_time_wait";
- String NATS_PROPERTY_PING_INTERVAL = "ping_interval";
- String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "dont_randomize_servers";
+ String NATS_PROPERTY_VERBOSE = "io.nats.client.verbose";
+ String NATS_PROPERTY_PEDANTIC = "io.nats.client.pedantic";
+ String NATS_PROPERTY_RECONNECT = "io.nats.client.reconnect.allowed";
+ String NATS_PROPERTY_SSL = "io.nats.client.secure";
+ String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "io.nats.client.reconnect.max";
+ String NATS_PROPERTY_RECONNECT_TIME_WAIT = "io.nats.client.reconnect.wait";
+ String NATS_PROPERTY_PING_INTERVAL = "io.nats.client.pinginterval";
+ String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "io.nats.client.norandomize";
String NATS_PROPERTY_QUEUE = "queue";
String NATS_PROPERTY_MAX_MESSAGES = "max";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
index 67d1e7c..87d0c3e 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.nats;
import java.io.IOException;
import java.util.Properties;
+import java.util.concurrent.TimeoutException;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
@@ -25,7 +26,9 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Ignore;
import org.junit.Test;
-import org.nats.Connection;
+
+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
@Ignore("Require a running Nats server")
public class NatsConsumerLoadTest extends CamelTestSupport {
@@ -34,11 +37,11 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
protected MockEndpoint mockResultEndpoint;
@Test
- public void testLoadConsumer() throws InterruptedException, IOException {
+ public void testLoadConsumer() throws InterruptedException, IOException, TimeoutException {
mockResultEndpoint.setExpectedMessageCount(10000);
-
- Connection connection = Connection.connect(new Properties());
-
+ ConnectionFactory cf = new ConnectionFactory("nats://localhost:4222");
+ Connection connection = cf.createConnection();
+
for (int i = 0; i < 10000; i++) {
connection.publish("test", ("test" + i).getBytes());
}
@@ -51,6 +54,7 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ from("direct:send").to("nats://localhost:4222?topic=test");
from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
index c637cef..b69a6b7 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
@@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesQueueTest extends CamelTestSupport {
@Test
public void testMaxConsumer() throws InterruptedException, IOException {
- mockResultEndpoint.expectedBodiesReceivedInAnyOrder("test", "test1");
+ mockResultEndpoint.expectedBodiesReceivedInAnyOrder("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}");
mockResultEndpoint.setExpectedMessageCount(2);
template.sendBody("direct:send", "test");
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
index 6e7482e..5ee94d9 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
@@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport {
@Test
public void testMaxConsumer() throws InterruptedException, IOException {
- mockResultEndpoint.expectedBodiesReceived("test", "test1", "test2", "test3", "test4");
+ mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}");
mockResultEndpoint.setExpectedMessageCount(5);
template.sendBody("direct:send", "test");
template.sendBody("direct:send", "test1");
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index c689ade..ca63048 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -34,7 +34,7 @@ public class NatsConsumerTest extends CamelTestSupport {
@Test
public void testConsumer() throws InterruptedException, IOException {
mockResultEndpoint.expectedMessageCount(1);
- mockResultEndpoint.expectedBodiesReceived("test");
+ mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}");
template.requestBody("direct:send", "test");
mockResultEndpoint.assertIsSatisfied();
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
index 8f10b4c..4a22551 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
@@ -26,6 +26,7 @@ public class NatsProducerTest extends CamelTestSupport {
@Test
public void sendTest() throws Exception {
+
template.sendBody("direct:send", "pippo");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index b6dde19..386a760 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -267,8 +267,7 @@
<java-apns-bundle-version>1.0.0.Beta6_1</java-apns-bundle-version>
<java-apns-version>1.0.0.Beta6</java-apns-version>
<java-ewah-version>0.7.9</java-ewah-version>
- <java-nats-version>0.5.2</java-nats-version>
- <java-nats-bundle-version>0.5.2_1</java-nats-bundle-version>
+ <jnats-version>0.4.0</jnats-version>
<javacc-maven-plugin-version>2.6</javacc-maven-plugin-version>
<javacrumbs-version>0.22</javacrumbs-version>
<javassist-bundle-version>3.12.1.GA_3</javassist-bundle-version>
http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index dd084e9..e5c0ea9 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1152,7 +1152,7 @@
</feature>
<feature name='camel-nats' version='${project.version}' resolver='(obr)' start-level='50'>
<feature version='${project.version}'>camel-core</feature>
- <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.java_nats/${java-nats-bundle-version}</bundle>
+ <bundle dependency='true'>wrap:mvn:io.nats/jnats/${jnats-version}</bundle>
<bundle>mvn:org.apache.camel/camel-nats/${project.version}</bundle>
</feature>
<feature name='camel-netty' version='${project.version}' resolver='(obr)' start-level='50'>