You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/04/03 12:20:15 UTC

[3/3] camel git commit: CAMEL-9803: Camel-NATS: Switch to Jnats client as Java_nats is deprecated

CAMEL-9803: Camel-NATS: Switch to Jnats client as Java_nats is deprecated


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66f0fe84
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66f0fe84
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66f0fe84

Branch: refs/heads/master
Commit: 66f0fe84c5a4dece014660ce4ebaf3e01fac94ec
Parents: 431ee2b
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sun Apr 3 12:02:38 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sun Apr 3 12:19:54 2016 +0200

----------------------------------------------------------------------
 components/camel-nats/pom.xml                   |  6 +-
 .../camel/component/nats/NatsConfiguration.java | 12 +--
 .../camel/component/nats/NatsConsumer.java      | 78 ++++++++++++++------
 .../camel/component/nats/NatsProducer.java      | 12 ++-
 .../component/nats/NatsPropertiesConstants.java | 16 ++--
 .../component/nats/NatsConsumerLoadTest.java    | 14 ++--
 .../nats/NatsConsumerMaxMessagesQueueTest.java  |  2 +-
 .../nats/NatsConsumerMaxMessagesTest.java       |  2 +-
 .../camel/component/nats/NatsConsumerTest.java  |  2 +-
 .../camel/component/nats/NatsProducerTest.java  |  1 +
 parent/pom.xml                                  |  3 +-
 .../features/src/main/resources/features.xml    |  2 +-
 12 files changed, 96 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-nats/pom.xml b/components/camel-nats/pom.xml
index 6ca8079..c2fd1dc 100644
--- a/components/camel-nats/pom.xml
+++ b/components/camel-nats/pom.xml
@@ -34,9 +34,9 @@
       <artifactId>camel-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.github.tyagihas</groupId>
-      <artifactId>java_nats</artifactId>
-      <version>${java-nats-version}</version>
+      <groupId>io.nats</groupId>
+      <artifactId>jnats</artifactId>
+      <version>${jnats-version}</version>
     </dependency>
     <!-- testing -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 260d1a7..1618eb9 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -218,12 +218,12 @@ public class NatsConfiguration {
         return props;
     }
 
-    public Properties createSubProperties() {
-        Properties props = new Properties();
-        addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
-        addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
-        return props;
-    }
+//    public Properties createSubProperties() {
+//        Properties props = new Properties();
+//        addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
+//        addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
+//        return props;
+//    }
 
     private String splitServers() {
         StringBuilder servers = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 9c8a29d..8be0aea 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -19,15 +19,21 @@ package org.apache.camel.component.nats;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-import org.nats.Connection;
-import org.nats.MsgHandler;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
+import io.nats.client.Message;
+import io.nats.client.MessageHandler;
+import io.nats.client.Subscription;
+
 public class NatsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class);
@@ -35,7 +41,7 @@ public class NatsConsumer extends DefaultConsumer {
     private final Processor processor;
     private ExecutorService executor;
     private Connection connection;
-    private int sid;
+    private Subscription sid;
 
     public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -67,7 +73,7 @@ public class NatsConsumer extends DefaultConsumer {
         connection.flush();
         
         try {
-            connection.unsubscribe(sid);
+            sid.unsubscribe();
         } catch (Exception e) {
             getExceptionHandler().handleException("Error during unsubscribing", e);
         }
@@ -83,14 +89,15 @@ public class NatsConsumer extends DefaultConsumer {
         executor = null;
         
         LOG.debug("Closing Nats Connection");
-        if (connection.isConnected()) {
+        if (!connection.isClosed()) {
             connection.close();   
         }
     }
 
-    private Connection getConnection() throws IOException, InterruptedException {
+    private Connection getConnection() throws IOException, InterruptedException, TimeoutException {
         Properties prop = getEndpoint().getNatsConfiguration().createProperties();
-        connection = Connection.connect(prop);
+        ConnectionFactory factory = new ConnectionFactory(prop);
+        connection = factory.createConnection();
         return connection;
     }
 
@@ -107,23 +114,50 @@ public class NatsConsumer extends DefaultConsumer {
         @Override
         public void run() {
             try {
-                sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), configuration.createSubProperties(), new MsgHandler() {
-                    public void execute(String msg) {
-                        LOG.debug("Received Message: {}", msg);
-                        Exchange exchange = getEndpoint().createExchange();
-                        exchange.getIn().setBody(msg);
-                        exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
-                        exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
-                        try {
-                            processor.process(exchange);
-                        } catch (Exception e) {
-                            getExceptionHandler().handleException("Error during processing", exchange, e);
+            	if (ObjectHelper.isNotEmpty(configuration.getQueueName())) {
+                    sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() {
+    					
+    					@Override
+    					public void onMessage(Message msg) {
+                            LOG.debug("Received Message: {}", msg);
+                            Exchange exchange = getEndpoint().createExchange();
+                            exchange.getIn().setBody(msg);
+                            exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
+                            exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
+                            try {
+                                processor.process(exchange);
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException("Error during processing", exchange, e);
+                            }
                         }
+    				});
+                    if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
+                    	sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
                     }
-                });
-            } catch (Throwable e) {
-                getExceptionHandler().handleException("Error during processing", e);
-            }
+            	} else {
+                    sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() {
+    					
+    					@Override
+    					public void onMessage(Message msg) {
+                            LOG.debug("Received Message: {}", msg);
+                            Exchange exchange = getEndpoint().createExchange();
+                            exchange.getIn().setBody(msg);
+                            exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
+                            exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
+                            try {
+                                processor.process(exchange);
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException("Error during processing", exchange, e);
+                            }
+                        }
+    				});
+                    if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
+                    	sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
+                    }    
+            	}
+                } catch (Throwable e) {
+                    getExceptionHandler().handleException("Error during processing", e);
+                }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 89b2b23..2e92f44 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -18,13 +18,16 @@ package org.apache.camel.component.nats;
 
 import java.io.IOException;
 import java.util.Properties;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
-import org.nats.Connection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
+
 public class NatsProducer extends DefaultProducer {
     
     private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);
@@ -64,14 +67,15 @@ public class NatsProducer extends DefaultProducer {
         LOG.debug("Stopping Nats Producer");
         
         LOG.debug("Closing Nats Connection");
-        if (connection != null && connection.isConnected()) {
+        if (connection != null && !connection.isClosed()) {
             connection.close();
         }
     }
 
-    private Connection getConnection() throws IOException, InterruptedException {
+    private Connection getConnection() throws TimeoutException, IOException {
         Properties prop = getEndpoint().getNatsConfiguration().createProperties();
-        connection = Connection.connect(prop);
+        ConnectionFactory factory = new ConnectionFactory(prop);
+        connection = factory.createConnection();
         return connection;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
index 8c09ce8..2e09361 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
@@ -18,14 +18,14 @@ package org.apache.camel.component.nats;
 
 public interface NatsPropertiesConstants {
     String NATS_PROPERTY_URI = "uri";
-    String NATS_PROPERTY_VERBOSE = "verbose";
-    String NATS_PROPERTY_PEDANTIC = "pedantic";
-    String NATS_PROPERTY_RECONNECT = "reconnect";
-    String NATS_PROPERTY_SSL = "ssl";
-    String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "max_reconnect_attempts";
-    String NATS_PROPERTY_RECONNECT_TIME_WAIT = "reconnect_time_wait";
-    String NATS_PROPERTY_PING_INTERVAL = "ping_interval";
-    String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "dont_randomize_servers";
+    String NATS_PROPERTY_VERBOSE = "io.nats.client.verbose";
+    String NATS_PROPERTY_PEDANTIC = "io.nats.client.pedantic";
+    String NATS_PROPERTY_RECONNECT = "io.nats.client.reconnect.allowed";
+    String NATS_PROPERTY_SSL = "io.nats.client.secure";
+    String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "io.nats.client.reconnect.max";
+    String NATS_PROPERTY_RECONNECT_TIME_WAIT = "io.nats.client.reconnect.wait";
+    String NATS_PROPERTY_PING_INTERVAL = "io.nats.client.pinginterval";
+    String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "io.nats.client.norandomize";
     String NATS_PROPERTY_QUEUE = "queue";
     String NATS_PROPERTY_MAX_MESSAGES = "max";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
index 67d1e7c..87d0c3e 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.nats;
 
 import java.io.IOException;
 import java.util.Properties;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
@@ -25,7 +26,9 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.nats.Connection;
+
+import io.nats.client.Connection;
+import io.nats.client.ConnectionFactory;
 
 @Ignore("Require a running Nats server")
 public class NatsConsumerLoadTest extends CamelTestSupport {
@@ -34,11 +37,11 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
     protected MockEndpoint mockResultEndpoint;
 
     @Test
-    public void testLoadConsumer() throws InterruptedException, IOException {
+    public void testLoadConsumer() throws InterruptedException, IOException, TimeoutException {
         mockResultEndpoint.setExpectedMessageCount(10000);
-        
-        Connection connection = Connection.connect(new Properties());
-        
+        ConnectionFactory cf = new ConnectionFactory("nats://localhost:4222");
+        Connection connection = cf.createConnection();
+
         for (int i = 0; i < 10000; i++) {
             connection.publish("test", ("test" + i).getBytes());
         }
@@ -51,6 +54,7 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+            	from("direct:send").to("nats://localhost:4222?topic=test");
                 from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
             }
         };

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
index c637cef..b69a6b7 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
@@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesQueueTest extends CamelTestSupport {
 
     @Test
     public void testMaxConsumer() throws InterruptedException, IOException {
-        mockResultEndpoint.expectedBodiesReceivedInAnyOrder("test", "test1");
+        mockResultEndpoint.expectedBodiesReceivedInAnyOrder("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}");
         mockResultEndpoint.setExpectedMessageCount(2);
         
         template.sendBody("direct:send", "test");

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
index 6e7482e..5ee94d9 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
@@ -33,7 +33,7 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport {
 
     @Test
     public void testMaxConsumer() throws InterruptedException, IOException {
-        mockResultEndpoint.expectedBodiesReceived("test", "test1", "test2", "test3", "test4");
+        mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}", "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}");
         mockResultEndpoint.setExpectedMessageCount(5);
         template.sendBody("direct:send", "test");
         template.sendBody("direct:send", "test1");

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index c689ade..ca63048 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -34,7 +34,7 @@ public class NatsConsumerTest extends CamelTestSupport {
     @Test
     public void testConsumer() throws InterruptedException, IOException {
         mockResultEndpoint.expectedMessageCount(1);
-        mockResultEndpoint.expectedBodiesReceived("test");
+        mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}");
         template.requestBody("direct:send", "test");
 
         mockResultEndpoint.assertIsSatisfied();

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
index 8f10b4c..4a22551 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
@@ -26,6 +26,7 @@ public class NatsProducerTest extends CamelTestSupport {
     
     @Test
     public void sendTest() throws Exception {
+        
         template.sendBody("direct:send", "pippo");
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index b6dde19..386a760 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -267,8 +267,7 @@
     <java-apns-bundle-version>1.0.0.Beta6_1</java-apns-bundle-version>
     <java-apns-version>1.0.0.Beta6</java-apns-version>
     <java-ewah-version>0.7.9</java-ewah-version>
-    <java-nats-version>0.5.2</java-nats-version>
-    <java-nats-bundle-version>0.5.2_1</java-nats-bundle-version>
+    <jnats-version>0.4.0</jnats-version>
     <javacc-maven-plugin-version>2.6</javacc-maven-plugin-version>
     <javacrumbs-version>0.22</javacrumbs-version>
     <javassist-bundle-version>3.12.1.GA_3</javassist-bundle-version>

http://git-wip-us.apache.org/repos/asf/camel/blob/66f0fe84/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index dd084e9..e5c0ea9 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1152,7 +1152,7 @@
   </feature>
   <feature name='camel-nats' version='${project.version}' resolver='(obr)' start-level='50'>
     <feature version='${project.version}'>camel-core</feature>
-    <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.java_nats/${java-nats-bundle-version}</bundle>
+    <bundle dependency='true'>wrap:mvn:io.nats/jnats/${jnats-version}</bundle>
     <bundle>mvn:org.apache.camel/camel-nats/${project.version}</bundle>
   </feature>
   <feature name='camel-netty' version='${project.version}' resolver='(obr)' start-level='50'>