You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Sergio Luppo <sl...@yahoo.it.INVALID> on 2019/11/06 14:20:22 UTC
Camel-netty4 to JMS leave opened threads
I'm developing a web application that when user click on a button, a string is sent via socket to another server where a camel server is running.
Here message is processed to create a new string that will be sent to a jms queue.
Communications from webapp / camel server / jms queue works but every time I click on button a nw couple of NettyServerTCPWorker/NettyEventExecutorGroup being created.
Here my server code:
package nettyToJms;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.netty4.NettyConstants;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;
import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsConstants;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class nettyToJmsTemplate {
private static String nettyURL = "netty4:tcp://[::]:50300";
private static Boolean loaded = false;
public void start() throws Exception {
StringDecoder stringDecoder = new StringDecoder();
SimpleRegistry simpleRegistry = new SimpleRegistry();
simpleRegistry.put("stringEncoder", new StringEncoder());
simpleRegistry.put("stringDecoder", new StringDecoder());
CamelContext context = new DefaultCamelContext(simpleRegistry);
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsConnectionFactory cf = ff.createConnectionFactory();
cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");
cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "");
cf.setIntProperty(WMQConstants.WMQ_PORT, 1414);
cf.setStringProperty(WMQConstants.WMQ_CHANNEL, "SYSTEM.DEF.SVRCONN");
cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "CLSPlusToMQCamel");
cf.setIntProperty(WMQConstants.ACKNOWLEDGE_MODE, JmsConstants.AUTO_ACKNOWLEDGE);
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(cf));
context.addRoutes(new RouteBuilder() {
public void configure() {
from(nettyURL+"?decoder=#stringDecoder&disconnect=true&exchangePattern=InOnly").process(new Processor() {
public void process(Exchange exchange) throws Exception {
String body = exchange.getIn().getBody(String.class);
exchange.getOut().setBody("Bye " + body);
// some condition which determines if we should close
exchange.getOut().setBody("Bye " + body);
exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
}
})
.inOnly("jms:TEST").to("stream:out");
}
});
System.out.println("start context!");
context.start();
System.out.println("wait");
System.out.println("loaded");
/*
* while (loaded != true) { System.out.println("in attesan"); } context.stop();
*/
// System.out.println("stop context!");
}
public static void main(String args[]) throws Exception {
nettyToJmsTemplate j = new nettyToJmsTemplate();
j.start();
}
private Processor executeFirstProcessor() {
return new Processor() {
@Override
public void process(Exchange exchange) {
System.out.println("Hai scritto : " + exchange.getIn().getBody());
}
};
}
private Processor processFileContentInXml() {
return new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
try {
String messaggio = (String) exchange.getIn().getBody();
System.out.println("msg:" + messaggio);
exchange.getOut().setBody(buildXmlFilecontent(messaggio));
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
private String buildXmlFilecontent(String message)
{
return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><messaggio>" + message + "</messaggio>";
}
};
}
}
Camel-netty4 to JMS leave opened threads
Posted by Sergio Luppo <sl...@yahoo.it.INVALID>.
I'm developing a web application that when user click on a button, a string is sent via socket to another server where a camel server is running.
Here message is processed to create a new string that will be sent to a jms queue.
Communications from webapp / camel server / jms queue works but every time I click on button a nw couple of NettyServerTCPWorker/NettyEventExecutorGroup being created.
Here my server code:
package nettyToJms;
import org.apache.camel.CamelContext;import org.apache.camel.Exchange;import org.apache.camel.Processor;import org.apache.camel.builder.RouteBuilder;import org.apache.camel.component.jms.JmsComponent;import org.apache.camel.component.netty4.NettyConstants;import org.apache.camel.impl.DefaultCamelContext;import org.apache.camel.impl.SimpleRegistry;import com.ibm.msg.client.jms.JmsConnectionFactory;import com.ibm.msg.client.jms.JmsConstants;import com.ibm.msg.client.jms.JmsFactoryFactory;import com.ibm.msg.client.wmq.WMQConstants;
import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;
public class nettyToJmsTemplate {
private static String nettyURL = "netty4:tcp://[::]:50300";private static Boolean loaded = false;
public void start() throws Exception {
StringDecoder stringDecoder = new StringDecoder(); SimpleRegistry simpleRegistry = new SimpleRegistry(); simpleRegistry.put("stringEncoder", new StringEncoder()); simpleRegistry.put("stringDecoder", new StringDecoder()); CamelContext context = new DefaultCamelContext(simpleRegistry);
JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER); JmsConnectionFactory cf = ff.createConnectionFactory();
cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost"); cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, ""); cf.setIntProperty(WMQConstants.WMQ_PORT, 1414); cf.setStringProperty(WMQConstants.WMQ_CHANNEL, "SYSTEM.DEF.SVRCONN"); cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT); cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "CLSPlusToMQCamel"); cf.setIntProperty(WMQConstants.ACKNOWLEDGE_MODE, JmsConstants.AUTO_ACKNOWLEDGE);
context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(cf)); context.addRoutes(new RouteBuilder() { public void configure() { from(nettyURL+"?decoder=#stringDecoder&disconnect=true&exchangePattern=InOnly").process(new Processor() { public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); exchange.getOut().setBody("Bye " + body); // some condition which determines if we should close exchange.getOut().setBody("Bye " + body); exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true); } }) .inOnly("jms:TEST").to("stream:out"); } }); System.out.println("start context!"); context.start(); System.out.println("wait"); System.out.println("loaded"); /* * while (loaded != true) { System.out.println("in attesan"); } context.stop(); */ // System.out.println("stop context!");}
public static void main(String args[]) throws Exception { nettyToJmsTemplate j = new nettyToJmsTemplate(); j.start();}
private Processor executeFirstProcessor() { return new Processor() { @Override public void process(Exchange exchange) { System.out.println("Hai scritto : " + exchange.getIn().getBody()); } };}
private Processor processFileContentInXml() { return new Processor() { @Override public void process(Exchange exchange) throws Exception { try { String messaggio = (String) exchange.getIn().getBody(); System.out.println("msg:" + messaggio); exchange.getOut().setBody(buildXmlFilecontent(messaggio)); } catch (Exception ex) { System.out.println(ex.getMessage()); } }
private String buildXmlFilecontent(String message) { return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><messaggio>" + message + "</messaggio>"; } };
}}