You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by xiangqiuzhao <xi...@gmail.com> on 2012/04/15 06:01:22 UTC

how to use decoder and encoder in netty component?

my URI is
.to("netty:tcp://localhost:6789?encoders=#myEncoders&decoders=myDecoders&sync=true")

test main program like:

        Object decoder = new LengthDecoder(); //appctx.getBean("mydecoder");
        Object encoder = new MessageEncoder(10);
//appctx.getBean("myencoder");
        
        SimpleRegistry registry = new SimpleRegistry(); 
        
        List<ChannelDownstreamHandler> encoders = new
ArrayList<ChannelDownstreamHandler>();
        List<ChannelUpstreamHandler> decoders = new
ArrayList<ChannelUpstreamHandler>();
        encoders.add((ChannelDownstreamHandler)encoder);
        decoders.add((ChannelUpstreamHandler)decoder);
        
        registry.put("myEncoders", encoders);
        registry.put("myDecoders", decoders);
        
        CamelContext context = new DefaultCamelContext(registry);

        context.addRoutes(new RouteBuilder() {
            public void configure() throws Exception {
                from("direct:local")
                .process(new MyToProcessor())
               
.to("netty:tcp://localhost:6789?encoders=#myEncoders&decoders=myDecoders&sync=true")
                .process(new MyFromProcessor());
            }
        });
        
        context.start();
        
        Endpoint endpoint = context.getEndpoint("direct:cpsp");
        Exchange exchange = endpoint.createExchange();
        String data = "12345238011";
        
        Producer producer = endpoint.createProducer();
        exchange.getIn().setBody(data.trim());
        producer.process(exchange);
        if (exchange.getException() != null)
            throw exchange.getException();
        
        System.out.println("RESULT:" + exchange.getOut().getBody());

class MyToProcessor implements Processor {
    private static final transient Logger LOG =
LoggerFactory.getLogger(MyToProcessor.class);
    
    public void process(Exchange exchange) throws Exception {
        try {
            String in = (String)exchange.getIn().getBody();
            System.out.println("IN:" + in.length() + ":" + in);
            exchange.setProperty(Exchange.CHARSET_NAME, "UTF_8");
            exchange.getOut().setBody(in);
        } catch (Exception e) {
            LOG.error("ToProcessor Exception!" + e);
        }
    }
}

decoder is LengthDecoder :

public class LengthDecoder extends FrameDecoder { 
    private static final transient Logger LOG =
LoggerFactory.getLogger(LengthDecoder.class);
  
    protected Object decode( 
            ChannelHandlerContext ctx, Channel channel, ChannelBuffer
buffer) throws Exception {
        
        LOG.info("in Length decoder");
        
        if (buffer.readableBytes() < 6) { 
            System.out.println("readable not enough");
            return null;
        } 
        buffer.skipBytes(5);
        LOG.info("decoder buffer:" + buffer.toString());
        return buffer.toString(); 
    }
}

but why it's output RESULT:null?   i wan't to wait when buffer has no more
data(length<6), but how to wait? return null ?

why the log is :

11:54:38.587 [main] INFO  com.sunyard.camel.MessageEncoder - IN
INCODE:12345238011
11:54:38.602 [New I/O client worker #1-1] DEBUG
o.a.c.component.netty.NettyProducer - Operation complete
org.jboss.netty.channel.DefaultChannelFuture@8ddc4c
11:54:38.602 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.602 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=5, widx=134, cap=134)
11:54:38.602 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=5, widx=134, cap=134)
11:54:38.602 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.602 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=10, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=10, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=15, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=15, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=20, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=20, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=25, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=25, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=30, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=30, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=35, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=35, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=40, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=40, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=45, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=45, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=50, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=50, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=55, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=55, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=60, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=60, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=65, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=65, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=70, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=70, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=75, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=75, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=80, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=80, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=85, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=85, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=90, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=90, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=95, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=95, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=100, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=100, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=105, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=105, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=110, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=110, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=115, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=115, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=120, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=120, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=125, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=125, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - decoder
buffer:BigEndianHeapChannelBuffer(ridx=130, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] DEBUG
o.a.c.c.n.h.ClientChannelHandler - Message received:
BigEndianHeapChannelBuffer(ridx=130, widx=134, cap=134)
11:54:38.618 [New I/O client worker #1-1] INFO 
com.sunyard.camel.LengthDecoder - in Length decoder
readable not enough
RESULT:null
11:54:41.629 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel
2.7.2 (CamelContext:camel-1) is shutting down
11:54:41.629 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to
graceful shutdown 1 routes (timeout 300 seconds)
11:54:41.629 [main] DEBUG o.a.c.i.DefaultExecutorServiceStrategy - Created
new single thread pool for source:
org.apache.camel.impl.DefaultShutdownStrategy@1878144 with name:
ShutdownTask. ->
java.util.concurrent.Executors$FinalizableDelegatedExecutorService@1a6684f
11:54:41.629 [Camel (camel-1) thread #2 - ShutdownTask] DEBUG
o.a.c.impl.DefaultShutdownStrategy - There are 1 routes to shutdown
11:54:41.644 [Camel (camel-1) thread #2 - ShutdownTask] INFO 
o.a.c.impl.DefaultShutdownStrategy - Route: route1 suspension deferred.
11:54:41.644 [Camel (camel-1) thread #2 - ShutdownTask] DEBUG
o.a.c.impl.DefaultShutdownStrategy - Route: route1 preparing to shutdown.
11:54:41.644 [Camel (camel-1) thread #2 - ShutdownTask] INFO 
o.a.c.impl.DefaultShutdownStrategy - Route: route1 preparing to shutdown
complete.
11:54:41.644 [Camel (camel-1) thread #2 - ShutdownTask] DEBUG
o.a.c.impl.DefaultShutdownStrategy - Shutdown complete for:
Consumer[direct://cpsp]
11:54:41.644 [Camel (camel-1) thread #2 - ShutdownTask] INFO 
o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete.
11:54:41.644 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful
shutdown of 1 routes completed in 0 seconds
11:54:41.644 [main] DEBUG o.a.c.m.DefaultManagementAgent - Unregistered
MBean with objectname:
org.apache.camel:context=zhaoxq-notebook/camel-1,type=routes,name="route1"
11:54:41.644 [main] DEBUG org.apache.camel.impl.RouteService - Stopping
services on route: route1
11:54:41.644 [main] DEBUG o.a.c.m.DefaultManagementAgent - Unregistered
MBean with objectname:
org.apache.camel:context=zhaoxq-notebook/camel-1,type=consumers,name=DirectConsumer(0x1343ed0)
11:54:41.644 [main] DEBUG o.a.c.m.DefaultManagementAgent - Unregistered
MBean with objectname:
org.apache.camel:context=zhaoxq-notebook/camel-1,type=processors,name="process1"
11:54:41.644 [main] DEBUG o.a.c.m.DefaultManagementAgent - Unregistered
MBean with objectname:
org.apache.camel:context=zhaoxq-notebook/camel-1,type=processors,name="to1"
11:54:41.644 [main] DEBUG o.a.c.m.DefaultManagementAgent - Unregistered
MBean with objectname:
org.apache.camel:context=zhaoxq-notebook/camel-1,type=processors,name="process2"
11:54:41.644 [main] DEBUG o.a.c.m.DefaultManagementAgent - Unregistered
MBean with objectname:
org.apache.camel:context=zhaoxq-notebook/camel-1,type=context,name="camel-1"
11:54:41.644 [main] DEBUG o.a.c.i.DefaultExecutorServiceStrategy -
ShutdownNow ExecutorService:
java.util.concurrent.Executors$FinalizableDelegatedExecutorService@1a6684f
11:54:41.644 [main] INFO  o.a.c.impl.DefaultInflightRepository - Shutting
down with no inflight exchanges.
11:54:41.644 [main] DEBUG o.a.c.impl.SharedProducerServicePool - Stopping
service pool: org.apache.camel.impl.SharedProducerServicePool@165a3c2
11:54:41.644 [main] DEBUG o.a.c.component.netty.NettyProducer - Stopping
producer at address: localhost:6789
11:54:41.660 [main] INFO  com.sunyard.camel.LengthDecoder - in Length
decoder
readable not enough
11:54:41.661 [main] INFO  com.sunyard.camel.LengthDecoder - in Length
decoder
readable not enough

--
View this message in context: http://camel.465427.n5.nabble.com/how-to-use-decoder-and-encoder-in-netty-component-tp5641420p5641420.html
Sent from the Camel - Users mailing list archive at Nabble.com.