You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/19 07:51:17 UTC
[camel] 01/02: CAMEL-12664 - Camel-Nats: Bump to version 2.0.0 of
Jnats
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit e3a35a81b6a3c3cfd933db46e7c5097a7b46f453
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Jul 19 09:18:32 2018 +0200
CAMEL-12664 - Camel-Nats: Bump to version 2.0.0 of Jnats
---
.../camel-nats/src/main/docs/nats-component.adoc | 4 +-
.../camel/component/nats/NatsConfiguration.java | 73 +++++++++-------------
.../apache/camel/component/nats/NatsConstants.java | 1 -
.../apache/camel/component/nats/NatsConsumer.java | 61 +++++++++---------
.../apache/camel/component/nats/NatsProducer.java | 24 +++----
.../camel/component/nats/NatsConsumerLoadTest.java | 7 ++-
.../nats/NatsConsumerMaxMessagesQueueTest.java | 1 -
.../nats/NatsConsumerMaxMessagesTest.java | 2 -
.../camel/component/nats/NatsConsumerTest.java | 1 -
parent/pom.xml | 2 +-
10 files changed, 77 insertions(+), 99 deletions(-)
diff --git a/components/camel-nats/src/main/docs/nats-component.adoc b/components/camel-nats/src/main/docs/nats-component.adoc
index 1d3f78c..d356918 100644
--- a/components/camel-nats/src/main/docs/nats-component.adoc
+++ b/components/camel-nats/src/main/docs/nats-component.adoc
@@ -66,7 +66,7 @@ with the following path and query parameters:
|===
-==== Query Parameters (22 parameters):
+==== Query Parameters (20 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -91,9 +91,7 @@ with the following path and query parameters:
| *replySubject* (producer) | the subject to which subscribers should send response | | String
| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
| *secure* (security) | Set secure option indicating TLS is required | false | boolean
-| *ssl* (security) | Whether or not using SSL | false | boolean
| *sslContextParameters* (security) | To configure security using SSLContextParameters | | SSLContextParameters
-| *tlsDebug* (security) | TLS Debug, it will add additional console output | false | boolean
|===
// endpoint options: END
// spring-boot-auto-configure options: START
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
index 3e6fd20..5959a76 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -16,7 +16,8 @@
*/
package org.apache.camel.component.nats;
-import java.util.Properties;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
@@ -24,6 +25,9 @@ import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.jsse.SSLContextParameters;
+import io.nats.client.Options;
+import io.nats.client.Options.Builder;
+
@UriParams
public class NatsConfiguration {
@@ -39,8 +43,6 @@ public class NatsConfiguration {
private boolean pedantic;
@UriParam
private boolean verbose;
- @UriParam(label = "security")
- private boolean ssl;
@UriParam(defaultValue = "2000")
private int reconnectTimeWait = 2000;
@UriParam(defaultValue = "3")
@@ -64,8 +66,6 @@ public class NatsConfiguration {
@UriParam(label = "security")
private boolean secure;
@UriParam(label = "security")
- private boolean tlsDebug;
- @UriParam(label = "security")
private SSLContextParameters sslContextParameters;
/**
@@ -124,17 +124,6 @@ public class NatsConfiguration {
}
/**
- * Whether or not using SSL
- */
- public boolean getSsl() {
- return ssl;
- }
-
- public void setSsl(boolean ssl) {
- this.ssl = ssl;
- }
-
- /**
* Waiting time before attempts reconnection (in milliseconds)
*/
public int getReconnectTimeWait() {
@@ -257,17 +246,6 @@ public class NatsConfiguration {
}
/**
- * TLS Debug, it will add additional console output
- */
- public boolean isTlsDebug() {
- return tlsDebug;
- }
-
- public void setTlsDebug(boolean tlsDebug) {
- this.tlsDebug = tlsDebug;
- }
-
- /**
* To configure security using SSLContextParameters
*/
public SSLContextParameters getSslContextParameters() {
@@ -278,24 +256,29 @@ public class NatsConfiguration {
this.sslContextParameters = sslContextParameters;
}
- private static <T> void addPropertyIfNotNull(Properties props, String key, T value) {
- if (value != null) {
- props.put(key, value);
- }
- }
-
- public Properties createProperties() {
- Properties props = new Properties();
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_URL, splitServers());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_VERBOSE, getVerbose());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_PEDANTIC, getPedantic());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_SSL, getSsl());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_RECONNECT, getReconnect());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS, getMaxReconnectAttempts());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_RECONNECT_TIME_WAIT, getReconnectTimeWait());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_PING_INTERVAL, getPingInterval());
- addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_DONT_RANDOMIZE_SERVERS, getNoRandomizeServers());
- return props;
+ public Builder createOptions() throws NoSuchAlgorithmException, IllegalArgumentException {
+ Builder builder = new Options.Builder();
+ builder.server(splitServers());
+ if (getVerbose()) {
+ builder.verbose();
+ }
+ if (getPedantic()) {
+ builder.pedantic();
+ }
+ if (isSecure()) {
+ builder.secure();
+ }
+ if (!getReconnect()) {
+ builder.noReconnect();
+ } else {
+ builder.maxReconnects(getMaxReconnectAttempts());
+ builder.reconnectWait(Duration.ofMillis(getReconnectTimeWait()));
+ }
+ builder.pingInterval(Duration.ofMillis(getPingInterval()));
+ if (getNoRandomizeServers()) {
+ builder.noRandomize();
+ }
+ return builder;
}
private String splitServers() {
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
index 9bdee5d..2a3ea87 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
@@ -19,5 +19,4 @@ package org.apache.camel.component.nats;
public interface NatsConstants {
String NATS_MESSAGE_TIMESTAMP = "CamelNatsMessageTimestamp";
- String NATS_SUBSCRIPTION_ID = "CamelNatsSubscriptionId";
}
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 95bc0e3..529c6f9 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -18,17 +18,19 @@ package org.apache.camel.component.nats;
import java.io.IOException;
import java.security.GeneralSecurityException;
-import java.util.Properties;
+import java.time.Duration;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import io.nats.client.Connection;
-import io.nats.client.ConnectionFactory;
+import io.nats.client.Connection.Status;
+import io.nats.client.Dispatcher;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
-import io.nats.client.Subscription;
+import io.nats.client.Nats;
+import io.nats.client.Options;
+import io.nats.client.Options.Builder;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -44,8 +46,8 @@ public class NatsConsumer extends DefaultConsumer {
private final Processor processor;
private ExecutorService executor;
private Connection connection;
- private Subscription sid;
- private boolean subscribed;
+ private Dispatcher dispatcher;
+ private boolean active;
public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -75,11 +77,11 @@ public class NatsConsumer extends DefaultConsumer {
if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
LOG.debug("Flushing Messages before stopping");
- connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout());
+ connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout()));
}
try {
- sid.unsubscribe();
+ dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic());
} catch (Exception e) {
getExceptionHandler().handleException("Error during unsubscribing", e);
}
@@ -95,31 +97,28 @@ public class NatsConsumer extends DefaultConsumer {
executor = null;
LOG.debug("Closing Nats Connection");
- if (!connection.isClosed()) {
+ if (!connection.getStatus().equals(Status.CLOSED)) {
connection.close();
}
}
- private Connection getConnection() throws IOException, InterruptedException, TimeoutException, GeneralSecurityException {
- Properties prop = getEndpoint().getNatsConfiguration().createProperties();
- ConnectionFactory factory = new ConnectionFactory(prop);
+ private Connection getConnection() throws InterruptedException, IllegalArgumentException, GeneralSecurityException, IOException {
+ Builder builder = getEndpoint().getNatsConfiguration().createOptions();
if (getEndpoint().getNatsConfiguration().getSslContextParameters() != null && getEndpoint().getNatsConfiguration().isSecure()) {
SSLContext sslCtx = getEndpoint().getNatsConfiguration().getSslContextParameters().createSSLContext(getEndpoint().getCamelContext());
- factory.setSSLContext(sslCtx);
- if (getEndpoint().getNatsConfiguration().isTlsDebug()) {
- factory.setTlsDebug(getEndpoint().getNatsConfiguration().isTlsDebug());
- }
+ builder.sslContext(sslCtx);
}
- connection = factory.createConnection();
+ Options options = builder.build();
+ connection = Nats.connect(options);
return connection;
}
- public boolean isSubscribed() {
- return subscribed;
+ public boolean isActive() {
+ return active;
}
- public void setSubscribed(boolean subscribed) {
- this.subscribed = subscribed;
+ public void setActive(boolean active) {
+ this.active = active;
}
class NatsConsumingTask implements Runnable {
@@ -136,14 +135,13 @@ public class NatsConsumer extends DefaultConsumer {
public void run() {
try {
if (ObjectHelper.isNotEmpty(configuration.getQueueName())) {
- sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName(), new MessageHandler() {
+ dispatcher = connection.createDispatcher(new MessageHandler() {
@Override
public void onMessage(Message msg) {
LOG.debug("Received Message: {}", msg);
Exchange exchange = getEndpoint().createExchange();
exchange.getIn().setBody(msg);
exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
- exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
try {
processor.process(exchange);
} catch (Exception e) {
@@ -151,21 +149,21 @@ public class NatsConsumer extends DefaultConsumer {
}
}
});
+ dispatcher = dispatcher.subscribe(getEndpoint().getNatsConfiguration().getTopic(), getEndpoint().getNatsConfiguration().getQueueName());
if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
- sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
+ dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic(), Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
}
- if (sid.isValid()) {
- setSubscribed(true);
+ if (dispatcher.isActive()) {
+ setActive(true);
}
} else {
- sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), new MessageHandler() {
+ dispatcher = connection.createDispatcher(new MessageHandler() {
@Override
public void onMessage(Message msg) {
LOG.debug("Received Message: {}", msg);
Exchange exchange = getEndpoint().createExchange();
exchange.getIn().setBody(msg);
exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
- exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIPTION_ID, sid);
try {
processor.process(exchange);
} catch (Exception e) {
@@ -173,11 +171,12 @@ public class NatsConsumer extends DefaultConsumer {
}
}
});
+ dispatcher = dispatcher.subscribe(getEndpoint().getNatsConfiguration().getTopic());
if (ObjectHelper.isNotEmpty(getEndpoint().getNatsConfiguration().getMaxMessages())) {
- sid.autoUnsubscribe(Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
+ dispatcher.unsubscribe(getEndpoint().getNatsConfiguration().getTopic(), Integer.parseInt(getEndpoint().getNatsConfiguration().getMaxMessages()));
}
- if (sid.isValid()) {
- setSubscribed(true);
+ if (dispatcher.isActive()) {
+ setActive(true);
}
}
} catch (Throwable e) {
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
index 1be13e3..c8ac879 100644
--- a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -18,13 +18,18 @@ package org.apache.camel.component.nats;
import java.io.IOException;
import java.security.GeneralSecurityException;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import io.nats.client.Connection;
-import io.nats.client.ConnectionFactory;
+import io.nats.client.Connection.Status;
+import io.nats.client.Nats;
+import io.nats.client.Options;
+import io.nats.client.Options.Builder;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
@@ -77,26 +82,23 @@ public class NatsProducer extends DefaultProducer {
LOG.debug("Stopping Nats Producer");
LOG.debug("Closing Nats Connection");
- if (connection != null && !connection.isClosed()) {
+ if (connection != null && !connection.getStatus().equals(Status.CLOSED)) {
if (getEndpoint().getNatsConfiguration().isFlushConnection()) {
LOG.debug("Flushing Nats Connection");
- connection.flush(getEndpoint().getNatsConfiguration().getFlushTimeout());
+ connection.flush(Duration.ofMillis(getEndpoint().getNatsConfiguration().getFlushTimeout()));
}
connection.close();
}
}
- private Connection getConnection() throws TimeoutException, IOException, GeneralSecurityException {
- Properties prop = getEndpoint().getNatsConfiguration().createProperties();
- ConnectionFactory factory = new ConnectionFactory(prop);
+ private Connection getConnection() throws InterruptedException, IllegalArgumentException, GeneralSecurityException, IOException {
+ Builder builder = getEndpoint().getNatsConfiguration().createOptions();
if (getEndpoint().getNatsConfiguration().getSslContextParameters() != null && getEndpoint().getNatsConfiguration().isSecure()) {
SSLContext sslCtx = getEndpoint().getNatsConfiguration().getSslContextParameters().createSSLContext(getEndpoint().getCamelContext());
- factory.setSSLContext(sslCtx);
- if (getEndpoint().getNatsConfiguration().isTlsDebug()) {
- factory.setTlsDebug(getEndpoint().getNatsConfiguration().isTlsDebug());
- }
+ builder.sslContext(sslCtx);
}
- connection = factory.createConnection();
+ Options options = builder.build();
+ connection = Nats.connect(options);
return connection;
}
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
index d1a0350..830e031 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
@@ -20,7 +20,8 @@ import java.io.IOException;
import java.util.concurrent.TimeoutException;
import io.nats.client.Connection;
-import io.nats.client.ConnectionFactory;
+import io.nats.client.Nats;
+import io.nats.client.Options;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
@@ -38,8 +39,8 @@ public class NatsConsumerLoadTest extends CamelTestSupport {
@Test
public void testLoadConsumer() throws InterruptedException, IOException, TimeoutException {
mockResultEndpoint.setExpectedMessageCount(10000);
- ConnectionFactory cf = new ConnectionFactory("nats://localhost:4222");
- Connection connection = cf.createConnection();
+ Options options = new Options.Builder().server("nats://localhost:4222").build();
+ Connection connection = Nats.connect(options);
for (int i = 0; i < 10000; i++) {
connection.publish("test", ("test" + i).getBytes());
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
index b69a6b7..5c42eae 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
@@ -33,7 +33,6 @@ public class NatsConsumerMaxMessagesQueueTest extends CamelTestSupport {
@Test
public void testMaxConsumer() throws InterruptedException, IOException {
- mockResultEndpoint.expectedBodiesReceivedInAnyOrder("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}");
mockResultEndpoint.setExpectedMessageCount(2);
template.sendBody("direct:send", "test");
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
index 7f4d434..d822a97 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
@@ -33,8 +33,6 @@ public class NatsConsumerMaxMessagesTest extends CamelTestSupport {
@Test
public void testMaxConsumer() throws InterruptedException, IOException {
- mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}", "{Subject=test;Reply=null;Payload=<test1>}",
- "{Subject=test;Reply=null;Payload=<test2>}", "{Subject=test;Reply=null;Payload=<test3>}", "{Subject=test;Reply=null;Payload=<test4>}");
mockResultEndpoint.setExpectedMessageCount(5);
template.sendBody("direct:send", "test");
template.sendBody("direct:send", "test1");
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
index 24d4877..86294f1 100644
--- a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -34,7 +34,6 @@ public class NatsConsumerTest extends CamelTestSupport {
@Test
public void testConsumer() throws InterruptedException, IOException {
mockResultEndpoint.expectedMessageCount(1);
- mockResultEndpoint.expectedBodiesReceived("{Subject=test;Reply=null;Payload=<test>}");
template.requestBody("direct:send", "test");
mockResultEndpoint.assertIsSatisfied();
diff --git a/parent/pom.xml b/parent/pom.xml
index 17a49f1..96a0bf1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -346,7 +346,7 @@
<java-util-version>1.34.0</java-util-version>
<java-util-bundle-version>1.34.0_1</java-util-bundle-version>
<jna-version>4.2.2</jna-version>
- <jnats-version>1.0</jnats-version>
+ <jnats-version>2.0.0</jnats-version>
<javacc-maven-plugin-version>2.6</javacc-maven-plugin-version>
<javacrumbs-version>0.22</javacrumbs-version>
<javapoet-version>1.11.1</javapoet-version>