You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/01/02 00:32:25 UTC
svn commit: r1054321 - in
/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina:
MinaConsumer.java MinaProducer.java
Author: cmueller
Date: Sat Jan 1 23:32:25 2011
New Revision: 1054321
URL: http://svn.apache.org/viewvc?rev=1054321&view=rev
Log:
MinaConsumer and MinaProducer now use the endpoint reference from its super class and doesn'n manager its own instance variable
Modified:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java?rev=1054321&r1=1054320&r2=1054321&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java Sat Jan 1 23:32:25 2011
@@ -40,7 +40,6 @@ import org.apache.mina.common.IoSession;
public class MinaConsumer extends DefaultConsumer {
private static final transient Log LOG = LogFactory.getLog(MinaConsumer.class);
- private final MinaEndpoint endpoint;
private final SocketAddress address;
private final IoAcceptor acceptor;
private boolean sync;
@@ -48,7 +47,6 @@ public class MinaConsumer extends Defaul
public MinaConsumer(final MinaEndpoint endpoint, Processor processor) {
super(endpoint, processor);
- this.endpoint = endpoint;
this.address = endpoint.getAddress();
this.acceptor = endpoint.getAcceptor();
this.sync = endpoint.getConfiguration().isSync();
@@ -63,7 +61,7 @@ public class MinaConsumer extends Defaul
}
IoHandler handler = new ReceiveHandler();
- acceptor.bind(address, handler, endpoint.getAcceptorConfig());
+ acceptor.bind(address, handler, getEndpoint().getAcceptorConfig());
}
@Override
@@ -74,6 +72,11 @@ public class MinaConsumer extends Defaul
acceptor.unbind(address);
super.doStop();
}
+
+ @Override
+ public MinaEndpoint getEndpoint() {
+ return (MinaEndpoint) super.getEndpoint();
+ }
/**
* Handles consuming messages and replying if the exchange is out capable.
@@ -99,15 +102,15 @@ public class MinaConsumer extends Defaul
Object in = object;
if (in instanceof byte[]) {
// byte arrays is not readable so convert to string
- in = endpoint.getCamelContext().getTypeConverter().convertTo(String.class, in);
+ in = getEndpoint().getCamelContext().getTypeConverter().convertTo(String.class, in);
}
LOG.debug("Received body: " + in);
}
- Exchange exchange = endpoint.createExchange(session, object);
+ Exchange exchange = getEndpoint().createExchange(session, object);
//Set the exchange charset property for converting
- if (endpoint.getConfiguration().getCharsetName() != null) {
- exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(endpoint.getConfiguration().getCharsetName()));
+ if (getEndpoint().getConfiguration().getCharsetName() != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(getEndpoint().getConfiguration().getCharsetName()));
}
try {
@@ -120,13 +123,13 @@ public class MinaConsumer extends Defaul
if (sync) {
Object body;
if (ExchangeHelper.isOutCapable(exchange)) {
- body = MinaPayloadHelper.getOut(endpoint, exchange);
+ body = MinaPayloadHelper.getOut(getEndpoint(), exchange);
} else {
- body = MinaPayloadHelper.getIn(endpoint, exchange);
+ body = MinaPayloadHelper.getIn(getEndpoint(), exchange);
}
boolean failed = exchange.isFailed();
- if (failed && !endpoint.getConfiguration().isTransferExchange()) {
+ if (failed && !getEndpoint().getConfiguration().isTransferExchange()) {
if (exchange.getException() != null) {
body = exchange.getException();
} else {
@@ -137,7 +140,7 @@ public class MinaConsumer extends Defaul
if (body == null) {
noReplyLogger.log("No payload to send as reply for exchange: " + exchange);
- if (endpoint.getConfiguration().isDisconnectOnNoReply()) {
+ if (getEndpoint().getConfiguration().isDisconnectOnNoReply()) {
// must close session if no data to write otherwise client will never receive a response
// and wait forever (if not timing out)
if (LOG.isDebugEnabled()) {
@@ -163,7 +166,7 @@ public class MinaConsumer extends Defaul
}
// should we disconnect, the header can override the configuration
- boolean disconnect = endpoint.getConfiguration().isDisconnect();
+ boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
if (close != null) {
disconnect = close;
}
@@ -175,6 +178,4 @@ public class MinaConsumer extends Defaul
}
}
}
-
-}
-
+}
\ No newline at end of file
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=1054321&r1=1054320&r2=1054321&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Sat Jan 1 23:32:25 2011
@@ -45,7 +45,6 @@ import org.apache.mina.transport.socket.
public class MinaProducer extends DefaultProducer implements ServicePoolAware {
private static final transient Log LOG = LogFactory.getLog(MinaProducer.class);
private IoSession session;
- private MinaEndpoint endpoint;
private CountDownLatch latch;
private boolean lazySessionCreation;
private long timeout;
@@ -55,12 +54,16 @@ public class MinaProducer extends Defaul
public MinaProducer(MinaEndpoint endpoint) {
super(endpoint);
- this.endpoint = endpoint;
this.lazySessionCreation = endpoint.getConfiguration().isLazySessionCreation();
this.timeout = endpoint.getConfiguration().getTimeout();
this.sync = endpoint.getConfiguration().isSync();
this.noReplyLogger = new Logger(LOG, endpoint.getConfiguration().getNoReplyLogLevel());
}
+
+ @Override
+ public MinaEndpoint getEndpoint() {
+ return (MinaEndpoint) super.getEndpoint();
+ }
@Override
public boolean isSingleton() {
@@ -78,19 +81,19 @@ public class MinaProducer extends Defaul
}
// set the exchange encoding property
- if (endpoint.getConfiguration().getCharsetName() != null) {
- exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(endpoint.getConfiguration().getCharsetName()));
+ if (getEndpoint().getConfiguration().getCharsetName() != null) {
+ exchange.setProperty(Exchange.CHARSET_NAME, IOConverter.normalizeCharset(getEndpoint().getConfiguration().getCharsetName()));
}
- Object body = MinaPayloadHelper.getIn(endpoint, exchange);
+ Object body = MinaPayloadHelper.getIn(getEndpoint(), exchange);
if (body == null) {
noReplyLogger.log("No payload to send for exchange: " + exchange);
return; // exit early since nothing to write
}
// if textline enabled then covert to a String which must be used for textline
- if (endpoint.getConfiguration().isTextline()) {
- body = endpoint.getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
+ if (getEndpoint().getConfiguration().isTextline()) {
+ body = getEndpoint().getCamelContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, body);
}
// if sync is true then we should also wait for a response (synchronous mode)
@@ -130,7 +133,7 @@ public class MinaProducer extends Defaul
throw new CamelExchangeException("Error occurred in ResponseHandler", exchange, handler.getCause());
} else if (!handler.isMessageReceived()) {
// no message received
- throw new CamelExchangeException("No response received from remote server: " + endpoint.getEndpointUri(), exchange);
+ throw new CamelExchangeException("No response received from remote server: " + getEndpoint().getEndpointUri(), exchange);
} else {
// set the result on either IN or OUT on the original exchange depending on its pattern
if (ExchangeHelper.isOutCapable(exchange)) {
@@ -150,13 +153,13 @@ public class MinaProducer extends Defaul
}
// should we disconnect, the header can override the configuration
- boolean disconnect = endpoint.getConfiguration().isDisconnect();
+ boolean disconnect = getEndpoint().getConfiguration().isDisconnect();
if (close != null) {
disconnect = close;
}
if (disconnect) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Closing session when complete at address: " + endpoint.getAddress());
+ LOG.debug("Closing session when complete at address: " + getEndpoint().getAddress());
}
session.close();
}
@@ -173,7 +176,7 @@ public class MinaProducer extends Defaul
@Override
protected void doStop() throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("Stopping connector: " + connector + " at address: " + endpoint.getAddress());
+ LOG.debug("Stopping connector: " + connector + " at address: " + getEndpoint().getAddress());
}
closeConnection();
super.doStop();
@@ -196,14 +199,14 @@ public class MinaProducer extends Defaul
}
private void openConnection() {
- SocketAddress address = endpoint.getAddress();
- connector = endpoint.getConnector();
+ SocketAddress address = getEndpoint().getAddress();
+ connector = getEndpoint().getConnector();
if (LOG.isDebugEnabled()) {
LOG.debug("Creating connector to address: " + address + " using connector: " + connector + " timeout: " + timeout + " millis.");
}
- IoHandler ioHandler = new ResponseHandler(endpoint);
+ IoHandler ioHandler = new ResponseHandler(getEndpoint());
// connect and wait until the connection is established
- ConnectFuture future = connector.connect(address, ioHandler, endpoint.getConnectorConfig());
+ ConnectFuture future = connector.connect(address, ioHandler, getEndpoint().getConnectorConfig());
future.join();
session = future.getSession();
}