You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2011/02/21 23:35:27 UTC

svn commit: r1073178 - /camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java

Author: cmueller
Date: Mon Feb 21 22:35:27 2011
New Revision: 1073178

URL: http://svn.apache.org/viewvc?rev=1073178&view=rev
Log:
CAMEL-3656: Add support for Asynchronous smpp interaction - work in progress

Modified:
    camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java

Modified: camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java?rev=1073178&r1=1073177&r2=1073178&view=diff
==============================================================================
--- camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java (original)
+++ camel/trunk/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java Mon Feb 21 22:35:27 2011
@@ -17,9 +17,12 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.impl.DefaultProducer;
 import org.jsmpp.DefaultPDUReader;
 import org.jsmpp.DefaultPDUSender;
@@ -47,7 +50,7 @@ import org.slf4j.LoggerFactory;
  * @version 
  * @author muellerc
  */
-public class SmppProducer extends DefaultProducer {
+public class SmppProducer extends DefaultAsyncProducer {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(SmppProducer.class);
 
@@ -114,40 +117,58 @@ public class SmppProducer extends Defaul
         }
     }
 
-    public void process(Exchange exchange) throws Exception {
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Sending a short message for exchange id '"
                     + exchange.getExchangeId() + "'...");
         }
         
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(true);
+            return true;
+        }
+        
         // only possible by trying to reconnect 
         if (this.session == null) {
-            throw new IOException("Lost connection to " + getEndpoint().getConnectionString() + " and yet not reconnected");
+            exchange.setException(new IOException("Lost connection to " + getEndpoint().getConnectionString() + " and yet not reconnected"));
+            callback.done(true);
+            return true;
         }
 
-        SubmitSm submitSm = getEndpoint().getBinding().createSubmitSm(exchange);
-        String messageId = session.submitShortMessage(
-                submitSm.getServiceType(), 
-                TypeOfNumber.valueOf(submitSm.getSourceAddrTon()),
-                NumberingPlanIndicator.valueOf(submitSm.getSourceAddrNpi()),
-                submitSm.getSourceAddr(),
-                TypeOfNumber.valueOf(submitSm.getDestAddrTon()),
-                NumberingPlanIndicator.valueOf(submitSm.getDestAddrNpi()),
-                submitSm.getDestAddress(),
-                new ESMClass(),
-                submitSm.getProtocolId(),
-                submitSm.getPriorityFlag(),
-                submitSm.getScheduleDeliveryTime(),
-                submitSm.getValidityPeriod(),
-                new RegisteredDelivery(submitSm.getRegisteredDelivery()),
-                submitSm.getReplaceIfPresent(),
-                new GeneralDataCoding(
-                        false,
-                        false,
-                        MessageClass.CLASS1,
-                        Alphabet.valueOf(submitSm.getDataCoding())),
-                (byte) 0,
-                submitSm.getShortMessage());
+        SubmitSm submitSm = null;
+        String messageId = null;
+        try {
+            submitSm = getEndpoint().getBinding().createSubmitSm(exchange);
+            messageId = session.submitShortMessage(
+                    submitSm.getServiceType(), 
+                    TypeOfNumber.valueOf(submitSm.getSourceAddrTon()),
+                    NumberingPlanIndicator.valueOf(submitSm.getSourceAddrNpi()),
+                    submitSm.getSourceAddr(),
+                    TypeOfNumber.valueOf(submitSm.getDestAddrTon()),
+                    NumberingPlanIndicator.valueOf(submitSm.getDestAddrNpi()),
+                    submitSm.getDestAddress(),
+                    new ESMClass(),
+                    submitSm.getProtocolId(),
+                    submitSm.getPriorityFlag(),
+                    submitSm.getScheduleDeliveryTime(),
+                    submitSm.getValidityPeriod(),
+                    new RegisteredDelivery(submitSm.getRegisteredDelivery()),
+                    submitSm.getReplaceIfPresent(),
+                    new GeneralDataCoding(
+                            false,
+                            false,
+                            MessageClass.CLASS1,
+                            Alphabet.valueOf(submitSm.getDataCoding())),
+                    (byte) 0,
+                    submitSm.getShortMessage());
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Sent a short message for exchange id '"
@@ -166,6 +187,9 @@ public class SmppProducer extends Defaul
             }
             exchange.getIn().setHeader(SmppBinding.ID, messageId);
         }
+        
+        // continue routing asynchronously
+        return false;
     }
 
     @Override