You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/01/18 14:30:54 UTC
[1/3] camel git commit: CAMEL-9267: camel-nats - A java client
component for the nats broker
Repository: camel
Updated Branches:
refs/heads/master b26e6a152 -> 2b22c3f28
CAMEL-9267: camel-nats - A java client component for the nats broker
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/56b815b6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/56b815b6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/56b815b6
Branch: refs/heads/master
Commit: 56b815b60f58dc9d207f9c5f078c55e198448a1c
Parents: b26e6a1
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Jan 18 10:03:40 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Jan 18 10:03:40 2016 +0100
----------------------------------------------------------------------
components/camel-nats/pom.xml | 81 +++++++
.../camel/component/nats/NatsComponent.java | 35 +++
.../camel/component/nats/NatsConfiguration.java | 227 +++++++++++++++++++
.../camel/component/nats/NatsConstants.java | 23 ++
.../camel/component/nats/NatsConsumer.java | 126 ++++++++++
.../camel/component/nats/NatsEndpoint.java | 68 ++++++
.../camel/component/nats/NatsProducer.java | 78 +++++++
.../component/nats/NatsPropertiesConstants.java | 31 +++
.../src/main/resources/META-INF/LICENSE.txt | 203 +++++++++++++++++
.../src/main/resources/META-INF/NOTICE.txt | 11 +
.../services/org/apache/camel/component/nats | 18 ++
.../component/nats/NatsConsumerLoadTest.java | 59 +++++
.../nats/NatsConsumerMaxMessagesQueueTest.java | 56 +++++
.../nats/NatsConsumerMaxMessagesTest.java | 63 +++++
.../camel/component/nats/NatsConsumerTest.java | 53 +++++
.../camel/component/nats/NatsProducerTest.java | 41 ++++
.../src/test/resources/log4j.properties | 36 +++
17 files changed, 1209 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-nats/pom.xml b/components/camel-nats/pom.xml
new file mode 100644
index 0000000..841e739
--- /dev/null
+++ b/components/camel-nats/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>components</artifactId>
+ <version>2.17-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-nats</artifactId>
+ <packaging>bundle</packaging>
+ <name>Camel :: Nats</name>
+
+ <properties>
+ <camel.osgi.export.pkg>org.apache.camel.component.nats.*</camel.osgi.export.pkg>
+ <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=nats</camel.osgi.export.service>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.github.tyagihas</groupId>
+ <artifactId>java_nats</artifactId>
+ <version>0.5.2</version>
+ </dependency>
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsComponent.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsComponent.java
new file mode 100644
index 0000000..2ccfe36
--- /dev/null
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsComponent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
+
+public class NatsComponent extends DefaultComponent {
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ NatsConfiguration config = new NatsConfiguration();
+ setProperties(config, parameters);
+ config.setServers(remaining);
+ NatsEndpoint endpoint = new NatsEndpoint(uri, this, config);
+ return endpoint;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
new file mode 100644
index 0000000..43182b9
--- /dev/null
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConfiguration.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.util.Properties;
+
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+
+@UriParams
+public class NatsConfiguration {
+
+ @UriPath @Metadata(required = "true")
+ private String servers;
+ @UriParam @Metadata(required = "true")
+ private String topic;
+ @UriParam(defaultValue = "true")
+ private boolean reconnect = true;
+ @UriParam(defaultValue = "false")
+ private boolean pedantic;
+ @UriParam(defaultValue = "false")
+ private boolean verbose;
+ @UriParam(defaultValue = "false")
+ private boolean ssl;
+ @UriParam(defaultValue = "2000")
+ private int reconnectTimeWait = 2000;
+ @UriParam(defaultValue = "3")
+ private int maxReconnectAttempts = 3;
+ @UriParam(defaultValue = "4000")
+ private int pingInterval = 4000;
+ @UriParam(defaultValue = "false")
+ private boolean noRandomizeServers;
+ @UriParam(label = "consumer")
+ private String queueName;
+ @UriParam(label = "consumer")
+ private String maxMessages;
+ @UriParam(label = "consumer", defaultValue = "10")
+ private int poolSize = 10;
+
+ /**
+ * The Nats servers
+ */
+ public String getServers() {
+ return servers;
+ }
+ public void setServers(String servers) {
+ this.servers = servers;
+ }
+
+ /**
+ * The name of topic we want to use
+ */
+ public String getTopic() {
+ return topic;
+ }
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Whether or not using reconnection feature
+ */
+ public boolean getReconnect() {
+ return reconnect;
+ }
+ public void setReconnect(boolean reconnect) {
+ this.reconnect = reconnect;
+ }
+
+ /**
+ * Whether or not running in pedantic mode (this affects performace)
+ */
+ public boolean getPedantic() {
+ return pedantic;
+ }
+ public void setPedantic(boolean pedantic) {
+ this.pedantic = pedantic;
+ }
+
+ /**
+ * Whether or not running in verbose mode
+ */
+ public boolean getVerbose() {
+ return verbose;
+ }
+ public void setVerbose(boolean verbose) {
+ this.verbose = verbose;
+ }
+
+ /**
+ * Whether or not using SSL
+ */
+ public boolean getSsl() {
+ return ssl;
+ }
+ public void setSsl(boolean ssl) {
+ this.ssl = ssl;
+ }
+
+ /**
+ * Waiting time before attempts reconnection (in milliseconds)
+ */
+ public int getReconnectTimeWait() {
+ return reconnectTimeWait;
+ }
+ public void setReconnectTimeWait(int reconnectTimeWait) {
+ this.reconnectTimeWait = reconnectTimeWait;
+ }
+
+ /**
+ * Max reconnection attempts
+ */
+ public int getMaxReconnectAttempts() {
+ return maxReconnectAttempts;
+ }
+ public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+ this.maxReconnectAttempts = maxReconnectAttempts;
+ }
+
+ /**
+ * Ping interval to be aware if connection is still alive (in milliseconds)
+ */
+ public int getPingInterval() {
+ return pingInterval;
+ }
+ public void setPingInterval(int pingInterval) {
+ this.pingInterval = pingInterval;
+ }
+
+ /**
+ * Whether or not randomizing the order of servers for the connection attempts
+ */
+ public boolean getNoRandomizeServers() {
+ return noRandomizeServers;
+ }
+ public void setNoRandomizeServers(boolean noRandomizeServers) {
+ this.noRandomizeServers = noRandomizeServers;
+ }
+
+ /**
+ * The Queue name if we are using nats for a queue configuration
+ */
+ public String getQueueName() {
+ return queueName;
+ }
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ /**
+ * Stop receiving messages from a topic we are subscribing to after maxMessages
+ */
+ public String getMaxMessages() {
+ return maxMessages;
+ }
+ public void setMaxMessages(String maxMessages) {
+ this.maxMessages = maxMessages;
+ }
+
+ /**
+ * Consumer pool size
+ */
+ public int getPoolSize() {
+ return poolSize;
+ }
+ public void setPoolSize(int poolSize) {
+ this.poolSize = poolSize;
+ }
+
+ private static <T> void addPropertyIfNotNull(Properties props, String key, T value) {
+ if (value != null) {
+ props.put(key, value);
+ }
+ }
+
+ public Properties createProperties() {
+ Properties props = new Properties();
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_URI, splitServers());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_VERBOSE, getVerbose());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_PEDANTIC, getPedantic());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_SSL, getSsl());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_RECONNECT, getReconnect());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS, getMaxReconnectAttempts());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_RECONNECT_TIME_WAIT, getReconnectTimeWait());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_PING_INTERVAL, getPingInterval());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_DONT_RANDOMIZE_SERVERS, getNoRandomizeServers());
+ return props;
+ }
+
+ public Properties createSubProperties() {
+ Properties props = new Properties();
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_QUEUE, getQueueName());
+ addPropertyIfNotNull(props, NatsPropertiesConstants.NATS_PROPERTY_MAX_MESSAGES, getMaxMessages());
+ return props;
+ }
+
+ private String splitServers() {
+ StringBuilder servers = new StringBuilder();
+ String prefix = "nats://";
+
+ String[] pieces = getServers().split(",");
+ for (int i = 0; i < pieces.length; i++) {
+ if (i < pieces.length - 1) {
+ servers.append(prefix + pieces[i] + ",");
+ } else {
+ servers.append(prefix + pieces[i]);
+ }
+ }
+ return servers.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
new file mode 100644
index 0000000..1d38369
--- /dev/null
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConstants.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+public interface NatsConstants {
+
+ String NATS_MESSAGE_TIMESTAMP = "natsMessageTimestamp";
+ String NATS_SUBSCRIBE_SID = "natsSubscribeSid";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
new file mode 100644
index 0000000..e63f485
--- /dev/null
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.nats.Connection;
+import org.nats.MsgHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NatsConsumer extends DefaultConsumer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NatsConsumer.class);
+
+ protected ExecutorService executor;
+ private final NatsEndpoint endpoint;
+ private final Processor processor;
+ private Connection connection;
+ private int sid;
+
+ public NatsConsumer(NatsEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ this.processor = processor;
+ }
+
+ @Override
+ public NatsEndpoint getEndpoint() {
+ return (NatsEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ LOG.debug("Starting Nats Consumer");
+ executor = endpoint.createExecutor();
+
+ LOG.debug("Getting Nats Connection");
+ connection = getConnection();
+
+ executor.submit(new NatsConsumingTask(connection, getEndpoint().getNatsConfiguration()));
+
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ LOG.debug("Stopping Nats Consumer");
+ if (executor != null) {
+ if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ } else {
+ executor.shutdownNow();
+ }
+ }
+ executor = null;
+
+ LOG.debug("Closing Nats Connection");
+ if (connection.isConnected()) {
+ connection.close();
+ }
+ }
+
+ private Connection getConnection() throws IOException, InterruptedException {
+
+ Properties prop = getEndpoint().getNatsConfiguration().createProperties();
+ connection = Connection.connect(prop);
+
+ return connection;
+ }
+
+ class NatsConsumingTask implements Runnable {
+
+ private final Connection connection;
+
+ private final NatsConfiguration configuration;
+
+ public NatsConsumingTask(Connection connection, NatsConfiguration configuration) {
+ this.connection = connection;
+ this.configuration = configuration;
+ }
+
+ @Override
+ public void run() {
+ try {
+ sid = connection.subscribe(getEndpoint().getNatsConfiguration().getTopic(), configuration.createSubProperties(), new MsgHandler() {
+ public void execute(String msg) {
+ LOG.debug("Received Message: " + msg);
+ Exchange exchange = getEndpoint().createExchange();
+ exchange.getIn().setBody(msg);
+ exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, System.currentTimeMillis());
+ exchange.getIn().setHeader(NatsConstants.NATS_SUBSCRIBE_SID, sid);
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error during processing", exchange, e);
+ }
+ }
+ });
+ } catch (IOException e1) {
+ getExceptionHandler().handleException("Error during processing", e1);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
new file mode 100644
index 0000000..6a076d4
--- /dev/null
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsEndpoint.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@UriEndpoint(scheme = "nats", title = "Nats", syntax = "nats:host", label = "messaging")
+public class NatsEndpoint extends DefaultEndpoint {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NatsEndpoint.class);
+
+ @UriParam
+ private NatsConfiguration configuration;
+
+ public NatsEndpoint(String uri, NatsComponent component, NatsConfiguration config) {
+ super(uri, component);
+ this.configuration = config;
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new NatsProducer(this);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new NatsConsumer(this, processor);
+ }
+
+ public ExecutorService createExecutor() {
+ return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "NatsTopic[" + configuration.getTopic() + "]", configuration.getPoolSize());
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return false;
+ }
+
+ /**
+ * The nats Configuration
+ */
+ public NatsConfiguration getNatsConfiguration() {
+ return configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
new file mode 100644
index 0000000..92e2424
--- /dev/null
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsProducer.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.nats.Connection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NatsProducer extends DefaultProducer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);
+
+ private Connection connection;
+
+ public NatsProducer(NatsEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public NatsEndpoint getEndpoint() {
+ return (NatsEndpoint) super.getEndpoint();
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ NatsConfiguration config = getEndpoint().getNatsConfiguration();
+ connection.publish(config.getTopic(), exchange.getIn().getBody(String.class).getBytes());
+ }
+
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ LOG.debug("Starting Nats Producer");
+
+ LOG.debug("Getting Nats Connection");
+ connection = getConnection();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ LOG.debug("Stopping Nats Producer");
+
+ LOG.debug("Closing Nats Connection");
+ if (connection.isConnected()) {
+ connection.close();
+ }
+ }
+
+
+ private Connection getConnection() throws IOException, InterruptedException {
+
+ Properties prop = getEndpoint().getNatsConfiguration().createProperties();
+ connection = Connection.connect(prop);
+
+ return connection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
new file mode 100644
index 0000000..8c09ce8
--- /dev/null
+++ b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsPropertiesConstants.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+public interface NatsPropertiesConstants {
+ String NATS_PROPERTY_URI = "uri";
+ String NATS_PROPERTY_VERBOSE = "verbose";
+ String NATS_PROPERTY_PEDANTIC = "pedantic";
+ String NATS_PROPERTY_RECONNECT = "reconnect";
+ String NATS_PROPERTY_SSL = "ssl";
+ String NATS_PROPERTY_MAX_RECONNECT_ATTEMPTS = "max_reconnect_attempts";
+ String NATS_PROPERTY_RECONNECT_TIME_WAIT = "reconnect_time_wait";
+ String NATS_PROPERTY_PING_INTERVAL = "ping_interval";
+ String NATS_PROPERTY_DONT_RANDOMIZE_SERVERS = "dont_randomize_servers";
+ String NATS_PROPERTY_QUEUE = "queue";
+ String NATS_PROPERTY_MAX_MESSAGES = "max";
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/resources/META-INF/LICENSE.txt
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/resources/META-INF/LICENSE.txt b/components/camel-nats/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/components/camel-nats/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/resources/META-INF/NOTICE.txt
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/resources/META-INF/NOTICE.txt b/components/camel-nats/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/components/camel-nats/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+ =========================================================================
+ == NOTICE file corresponding to the section 4 d of ==
+ == the Apache License, Version 2.0, ==
+ == in this case for the Apache Camel distribution. ==
+ =========================================================================
+
+ This product includes software developed by
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Please read the different LICENSE files present in the licenses directory of
+ this distribution.
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/main/resources/META-INF/services/org/apache/camel/component/nats
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/main/resources/META-INF/services/org/apache/camel/component/nats b/components/camel-nats/src/main/resources/META-INF/services/org/apache/camel/component/nats
new file mode 100644
index 0000000..e2e3a19
--- /dev/null
+++ b/components/camel-nats/src/main/resources/META-INF/services/org/apache/camel/component/nats
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class=org.apache.camel.component.nats.NatsComponent
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
new file mode 100644
index 0000000..67d1e7c
--- /dev/null
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerLoadTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.nats.Connection;
+
+@Ignore("Require a running Nats server")
+public class NatsConsumerLoadTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint mockResultEndpoint;
+
+ @Test
+ public void testLoadConsumer() throws InterruptedException, IOException {
+ mockResultEndpoint.setExpectedMessageCount(10000);
+
+ Connection connection = Connection.connect(new Properties());
+
+ for (int i = 0; i < 10000; i++) {
+ connection.publish("test", ("test" + i).getBytes());
+ }
+
+ mockResultEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
new file mode 100644
index 0000000..c637cef
--- /dev/null
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesQueueTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.io.IOException;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Require a running Nats server")
+public class NatsConsumerMaxMessagesQueueTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint mockResultEndpoint;
+
+ @Test
+ public void testMaxConsumer() throws InterruptedException, IOException {
+ mockResultEndpoint.expectedBodiesReceivedInAnyOrder("test", "test1");
+ mockResultEndpoint.setExpectedMessageCount(2);
+
+ template.sendBody("direct:send", "test");
+ template.sendBody("direct:send", "test1");
+
+ mockResultEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:send").to("nats://localhost:4222?topic=test");
+ from("nats://localhost:4222?topic=test&maxMessages=5&queueName=test").routeId("cons1").to(mockResultEndpoint);
+ from("nats://localhost:4222?topic=test&maxMessages=6&queueName=test").routeId("cons2").to(mockResultEndpoint);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
new file mode 100644
index 0000000..6e7482e
--- /dev/null
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerMaxMessagesTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.io.IOException;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Require a running Nats server")
+public class NatsConsumerMaxMessagesTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint mockResultEndpoint;
+
+ @Test
+ public void testMaxConsumer() throws InterruptedException, IOException {
+ mockResultEndpoint.expectedBodiesReceived("test", "test1", "test2", "test3", "test4");
+ mockResultEndpoint.setExpectedMessageCount(5);
+ template.sendBody("direct:send", "test");
+ template.sendBody("direct:send", "test1");
+ template.sendBody("direct:send", "test2");
+ template.sendBody("direct:send", "test3");
+ template.sendBody("direct:send", "test4");
+ template.sendBody("direct:send", "test5");
+ template.sendBody("direct:send", "test6");
+ template.sendBody("direct:send", "test7");
+ template.sendBody("direct:send", "test8");
+ template.sendBody("direct:send", "test9");
+ template.sendBody("direct:send", "test10");
+
+ mockResultEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:send").to("nats://localhost:4222?topic=test");
+ from("nats://localhost:4222?topic=test&maxMessages=5").to(mockResultEndpoint);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
new file mode 100644
index 0000000..c689ade
--- /dev/null
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsConsumerTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import java.io.IOException;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Require a running Nats server")
+public class NatsConsumerTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ protected MockEndpoint mockResultEndpoint;
+
+ @Test
+ public void testConsumer() throws InterruptedException, IOException {
+ mockResultEndpoint.expectedMessageCount(1);
+ mockResultEndpoint.expectedBodiesReceived("test");
+ template.requestBody("direct:send", "test");
+
+ mockResultEndpoint.assertIsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:send").to("nats://localhost:4222?topic=test");
+ from("nats://localhost:4222?topic=test").to(mockResultEndpoint);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
new file mode 100644
index 0000000..8f10b4c
--- /dev/null
+++ b/components/camel-nats/src/test/java/org/apache/camel/component/nats/NatsProducerTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.nats;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Require a running Nats server")
+public class NatsProducerTest extends CamelTestSupport {
+
+ @Test
+ public void sendTest() throws Exception {
+ template.sendBody("direct:send", "pippo");
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:send").to("nats://localhost:4222?topic=test");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/56b815b6/components/camel-nats/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-nats/src/test/resources/log4j.properties b/components/camel-nats/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8aa3fda
--- /dev/null
+++ b/components/camel-nats/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used
+#
+log4j.rootLogger=INFO, file
+
+#log4j.logger.org.apache.camel=DEBUG
+
+# CONSOLE appender not used by default
+log4j.appender.out=org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.file.file=target/camel-nats-test.log
+log4j.appender.file.append=true
+
[3/3] camel git commit: CAMEL-9267: Added Camel-nats to kit and
created feature
Posted by ac...@apache.org.
CAMEL-9267: Added Camel-nats to kit and created feature
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2b22c3f2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2b22c3f2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2b22c3f2
Branch: refs/heads/master
Commit: 2b22c3f287cf71a15fee9155a75ef3635c95186c
Parents: 49f0125
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Jan 18 11:12:42 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Jan 18 11:12:42 2016 +0100
----------------------------------------------------------------------
components/pom.xml | 1 +
parent/pom.xml | 5 +++
.../features/src/main/resources/features.xml | 5 +++
.../apache/camel/itest/karaf/CamelNatsTest.java | 40 ++++++++++++++++++++
4 files changed, 51 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2b22c3f2/components/pom.xml
----------------------------------------------------------------------
diff --git a/components/pom.xml b/components/pom.xml
index 33846eb..53f7dc6 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -172,6 +172,7 @@
<module>camel-mvel</module>
<module>camel-mybatis</module>
<module>camel-nagios</module>
+ <module>camel-nats</module>
<module>camel-netty</module>
<module>camel-netty4</module>
<module>camel-netty-http</module>
http://git-wip-us.apache.org/repos/asf/camel/blob/2b22c3f2/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 9c474f1..d62ed2d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -1296,6 +1296,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-nats</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-netty</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/2b22c3f2/platforms/karaf/features/src/main/resources/features.xml
----------------------------------------------------------------------
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 75ebde4..dc80c1d 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -1094,6 +1094,11 @@
<bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jsendnsca-core/${jsendnsca-bundle-version}</bundle>
<bundle>mvn:org.apache.camel/camel-nagios/${project.version}</bundle>
</feature>
+ <feature name='camel-nats' version='${project.version}' resolver='(obr)' start-level='50'>
+ <feature version='${project.version}'>camel-core</feature>
+ <bundle dependency='true'>wrap:mvn:com.github.tyagihas/java_nats/${java-nats-version}</bundle>
+ <bundle>mvn:org.apache.camel/camel-nats/${project.version}</bundle>
+ </feature>
<feature name='camel-netty' version='${project.version}' resolver='(obr)' start-level='50'>
<feature version='${project.version}'>camel-core</feature>
<bundle dependency='true'>mvn:io.netty/netty/${netty3-version}</bundle>
http://git-wip-us.apache.org/repos/asf/camel/blob/2b22c3f2/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelNatsTest.java
----------------------------------------------------------------------
diff --git a/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelNatsTest.java b/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelNatsTest.java
new file mode 100644
index 0000000..32ae4ae
--- /dev/null
+++ b/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelNatsTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.karaf;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+@RunWith(PaxExam.class)
+public class CamelNatsTest extends AbstractFeatureTest {
+
+ public static final String COMPONENT = extractName(CamelNatsTest.class);
+
+ @Test
+ public void test() throws Exception {
+ testComponent(COMPONENT);
+ }
+
+ @Configuration
+ public static Option[] configure() {
+ return configure(COMPONENT);
+ }
+
+}
\ No newline at end of file
[2/3] camel git commit: CAMEL-9267: Define Java Nats property
Posted by ac...@apache.org.
CAMEL-9267: Define Java Nats property
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/49f0125f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/49f0125f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/49f0125f
Branch: refs/heads/master
Commit: 49f0125f17e470b4a3f5459f536f91d3af3ad862
Parents: 56b815b
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Jan 18 10:14:35 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Jan 18 10:14:35 2016 +0100
----------------------------------------------------------------------
components/camel-nats/pom.xml | 2 +-
parent/pom.xml | 1 +
2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/49f0125f/components/camel-nats/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-nats/pom.xml b/components/camel-nats/pom.xml
index 841e739..f6130a9 100644
--- a/components/camel-nats/pom.xml
+++ b/components/camel-nats/pom.xml
@@ -36,7 +36,7 @@
<dependency>
<groupId>com.github.tyagihas</groupId>
<artifactId>java_nats</artifactId>
- <version>0.5.2</version>
+ <version>${java-nats-version}</version>
</dependency>
<!-- testing -->
<dependency>
http://git-wip-us.apache.org/repos/asf/camel/blob/49f0125f/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index ece2d43..9c474f1 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -264,6 +264,7 @@
<java-apns-bundle-version>1.0.0.Beta6_1</java-apns-bundle-version>
<java-apns-version>1.0.0.Beta6</java-apns-version>
<java-ewah-version>0.7.9</java-ewah-version>
+ <java-nats-version>0.5.2</java-nats-version>
<javacc-maven-plugin-version>2.6</javacc-maven-plugin-version>
<javacrumbs-version>0.22</javacrumbs-version>
<javassist-bundle-version>3.12.1.GA_3</javassist-bundle-version>