You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Sergio Luppo <sl...@yahoo.it.INVALID> on 2019/11/06 14:20:22 UTC

Camel-netty4 to JMS leave opened threads

I'm developing a web application that when user click on a button, a string is sent via socket to another server where a camel server is running.

Here message is processed to create a new string that will be sent to a jms queue.

Communications from webapp / camel server / jms queue works but every time I click on button a nw couple of NettyServerTCPWorker/NettyEventExecutorGroup being created.

Here my server code:
package nettyToJms;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.netty4.NettyConstants;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;
import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsConstants;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;

import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class nettyToJmsTemplate {

private static String nettyURL = "netty4:tcp://[::]:50300";
private static Boolean loaded = false;

public void start() throws Exception {


    StringDecoder stringDecoder = new StringDecoder();
    SimpleRegistry simpleRegistry = new SimpleRegistry();
    simpleRegistry.put("stringEncoder", new StringEncoder());
    simpleRegistry.put("stringDecoder", new StringDecoder());
    CamelContext context = new DefaultCamelContext(simpleRegistry);

    JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
    JmsConnectionFactory cf = ff.createConnectionFactory();

    cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");
    cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "");
    cf.setIntProperty(WMQConstants.WMQ_PORT, 1414);
    cf.setStringProperty(WMQConstants.WMQ_CHANNEL, "SYSTEM.DEF.SVRCONN");
    cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
    cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "CLSPlusToMQCamel");
    cf.setIntProperty(WMQConstants.ACKNOWLEDGE_MODE, JmsConstants.AUTO_ACKNOWLEDGE);

    context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(cf));
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from(nettyURL+"?decoder=#stringDecoder&disconnect=true&exchangePattern=InOnly").process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    String body = exchange.getIn().getBody(String.class);
                    exchange.getOut().setBody("Bye " + body);
                    // some condition which determines if we should close
                    exchange.getOut().setBody("Bye " + body);
                    exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);
                }
            })
            .inOnly("jms:TEST").to("stream:out");
        }
    });
    System.out.println("start context!");
    context.start();
    System.out.println("wait");
    System.out.println("loaded");
    /*
     * while (loaded != true) { System.out.println("in attesan"); } context.stop();
     */
    // System.out.println("stop context!");
}

public static void main(String args[]) throws Exception {
    nettyToJmsTemplate j = new nettyToJmsTemplate();
    j.start();
}

private Processor executeFirstProcessor() {
    return new Processor() {
        @Override
        public void process(Exchange exchange) {
            System.out.println("Hai scritto : " + exchange.getIn().getBody());
        }
    };
}

private Processor processFileContentInXml() {
    return new Processor() {
        @Override
        public void process(Exchange exchange) throws Exception {
            try {
                String messaggio = (String) exchange.getIn().getBody();
                System.out.println("msg:" + messaggio);
                exchange.getOut().setBody(buildXmlFilecontent(messaggio));
            } catch (Exception ex) {
                System.out.println(ex.getMessage());
            }
        }

        private String buildXmlFilecontent(String message)
        {
        return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><messaggio>" + message + "</messaggio>";
        }
    };


}
}

Camel-netty4 to JMS leave opened threads

Posted by Sergio Luppo <sl...@yahoo.it.INVALID>.
 I'm developing a web application that when user click on a button, a string is sent via socket to another server where a camel server is running.
Here message is processed to create a new string that will be sent to a jms queue.
Communications from webapp / camel server / jms queue works but every time I click on button a nw couple of NettyServerTCPWorker/NettyEventExecutorGroup being created.
Here my server code:
package nettyToJms;
import org.apache.camel.CamelContext;import org.apache.camel.Exchange;import org.apache.camel.Processor;import org.apache.camel.builder.RouteBuilder;import org.apache.camel.component.jms.JmsComponent;import org.apache.camel.component.netty4.NettyConstants;import org.apache.camel.impl.DefaultCamelContext;import org.apache.camel.impl.SimpleRegistry;import com.ibm.msg.client.jms.JmsConnectionFactory;import com.ibm.msg.client.jms.JmsConstants;import com.ibm.msg.client.jms.JmsFactoryFactory;import com.ibm.msg.client.wmq.WMQConstants;
import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;
public class nettyToJmsTemplate {
private static String nettyURL = "netty4:tcp://[::]:50300";private static Boolean loaded = false;
public void start() throws Exception {

    StringDecoder stringDecoder = new StringDecoder();    SimpleRegistry simpleRegistry = new SimpleRegistry();    simpleRegistry.put("stringEncoder", new StringEncoder());    simpleRegistry.put("stringDecoder", new StringDecoder());    CamelContext context = new DefaultCamelContext(simpleRegistry);
    JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);    JmsConnectionFactory cf = ff.createConnectionFactory();
    cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, "localhost");    cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, "");    cf.setIntProperty(WMQConstants.WMQ_PORT, 1414);    cf.setStringProperty(WMQConstants.WMQ_CHANNEL, "SYSTEM.DEF.SVRCONN");    cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);    cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "CLSPlusToMQCamel");    cf.setIntProperty(WMQConstants.ACKNOWLEDGE_MODE, JmsConstants.AUTO_ACKNOWLEDGE);
    context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(cf));    context.addRoutes(new RouteBuilder() {        public void configure() {            from(nettyURL+"?decoder=#stringDecoder&disconnect=true&exchangePattern=InOnly").process(new Processor() {                public void process(Exchange exchange) throws Exception {                    String body = exchange.getIn().getBody(String.class);                    exchange.getOut().setBody("Bye " + body);                    // some condition which determines if we should close                    exchange.getOut().setBody("Bye " + body);                    exchange.getOut().setHeader(NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE, true);                }            })            .inOnly("jms:TEST").to("stream:out");        }    });    System.out.println("start context!");    context.start();    System.out.println("wait");    System.out.println("loaded");    /*     * while (loaded != true) { System.out.println("in attesan"); } context.stop();     */    // System.out.println("stop context!");}
public static void main(String args[]) throws Exception {    nettyToJmsTemplate j = new nettyToJmsTemplate();    j.start();}
private Processor executeFirstProcessor() {    return new Processor() {        @Override        public void process(Exchange exchange) {            System.out.println("Hai scritto : " + exchange.getIn().getBody());        }    };}
private Processor processFileContentInXml() {    return new Processor() {        @Override        public void process(Exchange exchange) throws Exception {            try {                String messaggio = (String) exchange.getIn().getBody();                System.out.println("msg:" + messaggio);                exchange.getOut().setBody(buildXmlFilecontent(messaggio));            } catch (Exception ex) {                System.out.println(ex.getMessage());            }        }
        private String buildXmlFilecontent(String message)        {        return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><messaggio>" + message + "</messaggio>";        }    };

}}