You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/02/21 23:35:27 UTC
svn commit: r1073178 -
/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
Author: cmueller
Date: Mon Feb 21 22:35:27 2011
New Revision: 1073178
URL: http://svn.apache.org/viewvc?rev=1073178&view=rev
Log:
CAMEL-3656: Add support for Asynchronous smpp interaction - work in progress
Modified:
camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java?rev=1073178&r1=1073177&r2=1073178&view=diff
==============================================================================
--- camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java (original)
+++ camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java Mon Feb 21 22:35:27 2011
@@ -17,9 +17,12 @@
package org.apache.camel.component.smpp;
import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.impl.DefaultProducer;
import org.jsmpp.DefaultPDUReader;
import org.jsmpp.DefaultPDUSender;
@@ -47,7 +50,7 @@ import org.slf4j.LoggerFactory;
* @version
* @author muellerc
*/
-public class SmppProducer extends DefaultProducer {
+public class SmppProducer extends DefaultAsyncProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(SmppProducer.class);
@@ -114,40 +117,58 @@ public class SmppProducer extends Defaul
}
}
- public void process(Exchange exchange) throws Exception {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending a short message for exchange id '"
+ exchange.getExchangeId() + "'...");
}
+ if (!isRunAllowed()) {
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ callback.done(true);
+ return true;
+ }
+
// only possible by trying to reconnect
if (this.session == null) {
- throw new IOException("Lost connection to " + getEndpoint().getConnectionString() + " and yet not reconnected");
+ exchange.setException(new IOException("Lost connection to " + getEndpoint().getConnectionString() + " and yet not reconnected"));
+ callback.done(true);
+ return true;
}
- SubmitSm submitSm = getEndpoint().getBinding().createSubmitSm(exchange);
- String messageId = session.submitShortMessage(
- submitSm.getServiceType(),
- TypeOfNumber.valueOf(submitSm.getSourceAddrTon()),
- NumberingPlanIndicator.valueOf(submitSm.getSourceAddrNpi()),
- submitSm.getSourceAddr(),
- TypeOfNumber.valueOf(submitSm.getDestAddrTon()),
- NumberingPlanIndicator.valueOf(submitSm.getDestAddrNpi()),
- submitSm.getDestAddress(),
- new ESMClass(),
- submitSm.getProtocolId(),
- submitSm.getPriorityFlag(),
- submitSm.getScheduleDeliveryTime(),
- submitSm.getValidityPeriod(),
- new RegisteredDelivery(submitSm.getRegisteredDelivery()),
- submitSm.getReplaceIfPresent(),
- new GeneralDataCoding(
- false,
- false,
- MessageClass.CLASS1,
- Alphabet.valueOf(submitSm.getDataCoding())),
- (byte) 0,
- submitSm.getShortMessage());
+ SubmitSm submitSm = null;
+ String messageId = null;
+ try {
+ submitSm = getEndpoint().getBinding().createSubmitSm(exchange);
+ messageId = session.submitShortMessage(
+ submitSm.getServiceType(),
+ TypeOfNumber.valueOf(submitSm.getSourceAddrTon()),
+ NumberingPlanIndicator.valueOf(submitSm.getSourceAddrNpi()),
+ submitSm.getSourceAddr(),
+ TypeOfNumber.valueOf(submitSm.getDestAddrTon()),
+ NumberingPlanIndicator.valueOf(submitSm.getDestAddrNpi()),
+ submitSm.getDestAddress(),
+ new ESMClass(),
+ submitSm.getProtocolId(),
+ submitSm.getPriorityFlag(),
+ submitSm.getScheduleDeliveryTime(),
+ submitSm.getValidityPeriod(),
+ new RegisteredDelivery(submitSm.getRegisteredDelivery()),
+ submitSm.getReplaceIfPresent(),
+ new GeneralDataCoding(
+ false,
+ false,
+ MessageClass.CLASS1,
+ Alphabet.valueOf(submitSm.getDataCoding())),
+ (byte) 0,
+ submitSm.getShortMessage());
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Sent a short message for exchange id '"
@@ -166,6 +187,9 @@ public class SmppProducer extends Defaul
}
exchange.getIn().setHeader(SmppBinding.ID, messageId);
}
+
+ // continue routing asynchronously
+ return false;
}
@Override