You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/01/02 00:32:25 UTC

svn commit: r1054321 - in /camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina: MinaConsumer.java MinaProducer.java

Author: cmueller
Date: Sat Jan  1 23:32:25 2011
New Revision: 1054321

URL: http://svn.apache.org/viewvc?rev=1054321&view=rev
Log:
MinaConsumer and MinaProducer now use the endpoint reference from its super class and doesn'n manager its own instance variable

Modified:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=1054321&r1=1054320&r2=1054321&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat Jan  1 23:32:25 2011
@@ -40,7 +40,6 @@ import org.apache.mina.common.IoSession;
 public class MinaConsumer extends DefaultConsumer {
     private static final transient Log LOG = LogFactory.getLog(MinaConsumer.class);
 
-    private final MinaEndpoint endpoint;
     private final SocketAddress address;
     private final IoAcceptor acceptor;
     private boolean sync;
@@ -48,7 +47,6 @@ public class MinaConsumer extends Defaul
 
     public MinaConsumer(final MinaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        this.endpoint = endpoint;
         this.address = endpoint.getAddress();
         this.acceptor = endpoint.getAcceptor();
         this.sync = endpoint.getConfiguration().isSync();
@@ -63,7 +61,7 @@ public class MinaConsumer extends Defaul
         }
 
         IoHandler handler = new ReceiveHandler();
-        acceptor.bind(address, handler, endpoint.getAcceptorConfig());
+        acceptor.bind(address, handler, getEndpoint().getAcceptorConfig());
     }
 
     @Override
@@ -74,6 +72,11 @@ public class MinaConsumer extends Defaul
         acceptor.unbind(address);
         super.doStop();
     }
+    
+    @Override
+    public MinaEndpoint getEndpoint() {
+        return (MinaEndpoint) super.getEndpoint();
+    }
 
     /**
      * Handles consuming messages and replying if the exchange is out capable.
@@ -99,15 +102,15 @@ public class MinaConsumer extends Defaul
                 Object in = object;
                 if (in instanceof byte[]) {
                     // byte arrays is not readable so convert to string
-                    in = endpoint.getCamelContext().getTypeConverter().convertTo(String.class, in);
+                    in = getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
                 }
                 LOG.debug("Received body: " + in);
             }
 
-            Exchange exchange = endpoint.createExchange(session, object);
+            Exchange exchange = getEndpoint().createExchange(session, object);
             //Set the exchange charset property for converting
-            if (endpoint.getConfiguration().getCharsetName() != null) {
-                exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(endpoint.getConfiguration().getCharsetName()));
+            if (getEndpoint().getConfiguration().getCharsetName() != null) {
+                exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(getEndpoint().getConfiguration().getCharsetName()));
             }
 
             try {
@@ -120,13 +123,13 @@ public class MinaConsumer extends Defaul
             if (sync) {
                 Object body;
                 if (ExchangeHelper.isOutCapable(exchange)) {
-                    body = MinaPayloadHelper.getOut(endpoint, exchange);
+                    body = MinaPayloadHelper.getOut(getEndpoint(), exchange);
                 } else {
-                    body = MinaPayloadHelper.getIn(endpoint, exchange);
+                    body = MinaPayloadHelper.getIn(getEndpoint(), exchange);
                 }
 
                 boolean failed = exchange.isFailed();
-                if (failed && !endpoint.getConfiguration().isTransferExchange()) {
+                if (failed && !getEndpoint().getConfiguration().isTransferExchange()) {
                     if (exchange.getException() != null) {
                         body = exchange.getException();
                     } else {
@@ -137,7 +140,7 @@ public class MinaConsumer extends Defaul
 
                 if (body == null) {
                     noReplyLogger.log("No payload to send as reply for exchange: " + exchange);
-                    if (endpoint.getConfiguration().isDisconnectOnNoReply()) {
+                    if (getEndpoint().getConfiguration().isDisconnectOnNoReply()) {
                         // must close session if no data to write otherwise client will never receive a response
                         // and wait forever (if not timing out)
                         if (LOG.isDebugEnabled()) {
@@ -163,7 +166,7 @@ public class MinaConsumer extends Defaul
             }
 
             // should we disconnect, the header can override the configuration
-            boolean disconnect = endpoint.getConfiguration().isDisconnect();
+            boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
             if (close != null) {
                 disconnect = close;
             }
@@ -175,6 +178,4 @@ public class MinaConsumer extends Defaul
             }
         }
     }
-
-}
-
+}
\ No newline at end of file

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=1054321&r1=1054320&r2=1054321&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat Jan  1 23:32:25 2011
@@ -45,7 +45,6 @@ import org.apache.mina.transport.socket.
 public class MinaProducer extends DefaultProducer implements ServicePoolAware {
     private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
     private IoSession session;
-    private MinaEndpoint endpoint;
     private CountDownLatch latch;
     private boolean lazySessionCreation;
     private long timeout;
@@ -55,12 +54,16 @@ public class MinaProducer extends Defaul
 
     public MinaProducer(MinaEndpoint endpoint) {
         super(endpoint);
-        this.endpoint = endpoint;
         this.lazySessionCreation = endpoint.getConfiguration().isLazySessionCreation();
         this.timeout = endpoint.getConfiguration().getTimeout();
         this.sync = endpoint.getConfiguration().isSync();
         this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel());
     }
+    
+    @Override
+    public MinaEndpoint getEndpoint() {
+        return (MinaEndpoint) super.getEndpoint();
+    }
 
     @Override
     public boolean isSingleton() {
@@ -78,19 +81,19 @@ public class MinaProducer extends Defaul
         }
 
         // set the exchange encoding property
-        if (endpoint.getConfiguration().getCharsetName() != null) {
-            exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(endpoint.getConfiguration().getCharsetName()));
+        if (getEndpoint().getConfiguration().getCharsetName() != null) {
+            exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(getEndpoint().getConfiguration().getCharsetName()));
         }
 
-        Object body = MinaPayloadHelper.getIn(endpoint, exchange);
+        Object body = MinaPayloadHelper.getIn(getEndpoint(), exchange);
         if (body == null) {
             noReplyLogger.log("No payload to send for exchange: " + exchange);
             return; // exit early since nothing to write
         }
 
         // if textline enabled then covert to a String which must be used for textline
-        if (endpoint.getConfiguration().isTextline()) {
-            body = endpoint.getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+        if (getEndpoint().getConfiguration().isTextline()) {
+            body = getEndpoint().getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
         }
 
         // if sync is true then we should also wait for a response (synchronous mode)
@@ -130,7 +133,7 @@ public class MinaProducer extends Defaul
                 throw new CamelExchangeException("Error occurred in ResponseHandler", exchange, handler.getCause());
             } else if (!handler.isMessageReceived()) {
                 // no message received
-                throw new CamelExchangeException("No response received from remote server: " + endpoint.getEndpointUri(), exchange);
+                throw new CamelExchangeException("No response received from remote server: " + getEndpoint().getEndpointUri(), exchange);
             } else {
                 // set the result on either IN or OUT on the original exchange depending on its pattern
                 if (ExchangeHelper.isOutCapable(exchange)) {
@@ -150,13 +153,13 @@ public class MinaProducer extends Defaul
         }
 
         // should we disconnect, the header can override the configuration
-        boolean disconnect = endpoint.getConfiguration().isDisconnect();
+        boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
         if (close != null) {
             disconnect = close;
         }
         if (disconnect) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("Closing session when complete at address: " + endpoint.getAddress());
+                LOG.debug("Closing session when complete at address: " + getEndpoint().getAddress());
             }
             session.close();
         }
@@ -173,7 +176,7 @@ public class MinaProducer extends Defaul
     @Override
     protected void doStop() throws Exception {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Stopping connector: " + connector + " at address: " + endpoint.getAddress());
+            LOG.debug("Stopping connector: " + connector + " at address: " + getEndpoint().getAddress());
         }
         closeConnection();
         super.doStop();
@@ -196,14 +199,14 @@ public class MinaProducer extends Defaul
     }
 
     private void openConnection() {
-        SocketAddress address = endpoint.getAddress();
-        connector = endpoint.getConnector();
+        SocketAddress address = getEndpoint().getAddress();
+        connector = getEndpoint().getConnector();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Creating connector to address: " + address + " using connector: " + connector + " timeout: " + timeout + " millis.");
         }
-        IoHandler ioHandler = new ResponseHandler(endpoint);
+        IoHandler ioHandler = new ResponseHandler(getEndpoint());
         // connect and wait until the connection is established
-        ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConnectorConfig());
+        ConnectFuture future = connector.connect(address, ioHandler, getEndpoint().getConnectorConfig());
         future.join();
         session = future.getSession();
     }