You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by jb...@apache.org on 2016/12/09 17:32:47 UTC

camel git commit: [CAMEL-10271] Fix jt400 polling consumer endpoint

Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 9a58b0dd1 -> 975f1a18e


[CAMEL-10271] Fix jt400 polling consumer endpoint


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/975f1a18
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/975f1a18
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/975f1a18

Branch: refs/heads/camel-2.16.x
Commit: 975f1a18ec2c4aea781d52732e99bfb7383bfc87
Parents: 9a58b0d
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Dec 9 18:32:08 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Dec 9 18:32:08 2016 +0100

----------------------------------------------------------------------
 .../component/jt400/Jt400Configuration.java     | 44 ++++++++++------
 .../component/jt400/Jt400DataQueueConsumer.java | 53 +++++++++++++-------
 .../camel/component/jt400/Jt400Endpoint.java    | 37 +++++++-------
 3 files changed, 83 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/975f1a18/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
----------------------------------------------------------------------
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
index b33a25d..e6e4658 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
@@ -88,10 +88,10 @@ public class Jt400Configuration {
 
     @UriParam
     private int ccsid = DEFAULT_SYSTEM_CCSID;
-    
+
     @UriParam(defaultValue = "text")
     private Format format = Format.text;
-    
+
     @UriParam
     private boolean guiAvailable;
 
@@ -113,17 +113,20 @@ public class Jt400Configuration {
     @UriParam
     private Integer[] outputFieldsLengthArray;
 
+    @UriParam(label = "consumer", defaultValue = "30000")
+    private int readTimeout = 30000;
+
     public Jt400Configuration(String endpointUri, AS400ConnectionPool connectionPool) throws URISyntaxException {
         ObjectHelper.notNull(endpointUri, "endpointUri", this);
         ObjectHelper.notNull(connectionPool, "connectionPool", this);
-        
+
         URI uri = new URI(endpointUri);
         String[] credentials = uri.getUserInfo().split(":");
         systemName = uri.getHost();
         userID = credentials[0];
         password = credentials[1];
         objectPath = uri.getPath();
-        
+
         this.connectionPool = connectionPool;
     }
 
@@ -184,7 +187,7 @@ public class Jt400Configuration {
     }
 
     // Options
-    
+
     /**
      * Returns the CCSID to use for the connection with the AS/400 system.
      * Returns -1 if the CCSID to use is the default system CCSID.
@@ -192,21 +195,21 @@ public class Jt400Configuration {
     public int getCssid() {
         return ccsid;
     }
-    
+
     /**
      * Sets the CCSID to use for the connection with the AS/400 system.
      */
     public void setCcsid(int ccsid) {
         this.ccsid = (ccsid < 0) ? DEFAULT_SYSTEM_CCSID : ccsid;
     }
-    
+
     /**
      * Returns the data format for sending messages.
      */
     public Format getFormat() {
         return format;
     }
-    
+
     /**
      * Sets the data format for sending messages.
      */
@@ -214,7 +217,7 @@ public class Jt400Configuration {
         ObjectHelper.notNull(format, "format", this);
         this.format = format;
     }
-    
+
     /**
      * Returns whether AS/400 prompting is enabled in the environment running
      * Camel.
@@ -222,7 +225,7 @@ public class Jt400Configuration {
     public boolean isGuiAvailable() {
         return guiAvailable;
     }
-    
+
     /**
      * Sets whether AS/400 prompting is enabled in the environment running
      * Camel.
@@ -301,6 +304,17 @@ public class Jt400Configuration {
         this.outputFieldsLengthArray = outputFieldsLengthArray;
     }
 
+    public int getReadTimeout() {
+        return readTimeout;
+    }
+
+    /**
+     * Timeout in millis the consumer will wait while trying to read a new message of the data queue.
+     */
+    public void setReadTimeout(int readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+
     public void setOutputFieldsIdx(String outputFieldsIdx) {
         if (outputFieldsIdx != null) {
             String[] outputArray = outputFieldsIdx.split(",");
@@ -324,13 +338,13 @@ public class Jt400Configuration {
     }
 
     // AS400 connections
-    
+
     /**
      * Obtains an {@code AS400} object that connects to this endpoint. Since
      * these objects represent limited resources, clients have the
      * responsibility of {@link #releaseConnection(AS400) releasing them} when
      * done.
-     * 
+     *
      * @return an {@code AS400} object that connects to this endpoint
      */
     public AS400 getConnection() {
@@ -361,10 +375,10 @@ public class Jt400Configuration {
             throw new RuntimeCamelException("Unable to set the CSSID to use with " + system, e);
         }
     }
-    
+
     /**
      * Releases a previously obtained {@code AS400} object from use.
-     * 
+     *
      * @param connection a previously obtained {@code AS400} object to release
      */
     public void releaseConnection(AS400 connection) {
@@ -372,4 +386,4 @@ public class Jt400Configuration {
         connectionPool.returnConnectionToPool(connection);
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/975f1a18/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
index 825af3c..8327ba2 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
@@ -22,17 +22,15 @@ import com.ibm.as400.access.DataQueueEntry;
 import com.ibm.as400.access.KeyedDataQueue;
 import com.ibm.as400.access.KeyedDataQueueEntry;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.impl.ScheduledPollConsumer;
 
 /**
- * {@link org.apache.camel.PollingConsumer} that polls a data queue for data
+ * A scheduled {@link org.apache.camel.Consumer} that polls a data queue for data
  */
-public class Jt400DataQueueConsumer extends PollingConsumerSupport {
+public class Jt400DataQueueConsumer extends ScheduledPollConsumer {
 
-    private final Jt400Endpoint endpoint;
-    
     /**
      * Performs the lifecycle logic of this consumer.
      */
@@ -41,27 +39,46 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
     /**
      * Creates a new consumer instance
      */
-    protected Jt400DataQueueConsumer(Jt400Endpoint endpoint) {
-        super(endpoint);
-        this.endpoint = endpoint;
+    public Jt400DataQueueConsumer(Jt400Endpoint endpoint, Processor processor) {
+        super(endpoint, processor);
         this.queueService = new Jt400DataQueueService(endpoint);
     }
 
     @Override
+    public Jt400Endpoint getEndpoint() {
+        return (Jt400Endpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        Exchange exchange = receive(getEndpoint().getReadTimeout());
+        if (exchange != null) {
+            getProcessor().process(exchange);
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
     protected void doStart() throws Exception {
         queueService.start();
+        super.doStart();
     }
 
     @Override
     protected void doStop() throws Exception {
+        super.doStop();
         queueService.stop();
     }
 
+    @Deprecated
     public Exchange receive() {
         // -1 to indicate a blocking read from data queue
         return receive(-1);
     }
 
+    @Deprecated
     public Exchange receiveNoWait() {
         return receive(0);
     }
@@ -86,7 +103,7 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
     public Exchange receive(long timeout) {
         BaseDataQueue queue = queueService.getDataQueue();
         try {
-            if (endpoint.isKeyed()) {
+            if (getEndpoint().isKeyed()) {
                 return receive((KeyedDataQueue) queue, timeout);
             } else {
                 return receive((DataQueue) queue, timeout);
@@ -107,11 +124,10 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
             entry = queue.read(-1);
         }
 
-        Exchange exchange = new DefaultExchange(endpoint.getCamelContext());
-        exchange.setFromEndpoint(endpoint);
+        Exchange exchange = getEndpoint().createExchange();
         if (entry != null) {
             exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, entry.getSenderInformation());
-            if (endpoint.getFormat() == Jt400Configuration.Format.binary) {
+            if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) {
                 exchange.getIn().setBody(entry.getData());
             } else {
                 exchange.getIn().setBody(entry.getString());
@@ -122,8 +138,8 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
     }
 
     private Exchange receive(KeyedDataQueue queue, long timeout) throws Exception {
-        String key = endpoint.getSearchKey();
-        String searchType = endpoint.getSearchType().name();
+        String key = getEndpoint().getSearchKey();
+        String searchType = getEndpoint().getSearchType().name();
         KeyedDataQueueEntry entry;
         if (timeout >= 0) {
             int seconds = (int) timeout / 1000;
@@ -134,11 +150,10 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
             entry = queue.read(key, -1, searchType);
         }
 
-        Exchange exchange = new DefaultExchange(endpoint.getCamelContext());
-        exchange.setFromEndpoint(endpoint);
+        Exchange exchange = getEndpoint().createExchange();
         if (entry != null) {
             exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, entry.getSenderInformation());
-            if (endpoint.getFormat() == Jt400Configuration.Format.binary) {
+            if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) {
                 exchange.getIn().setBody(entry.getData());
                 exchange.getIn().setHeader(Jt400Endpoint.KEY, entry.getKey());
             } else {
@@ -150,4 +165,4 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
         return null;
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/975f1a18/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
index 14767e0..b487e61 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
@@ -24,17 +24,19 @@ import com.ibm.as400.access.AS400;
 import com.ibm.as400.access.AS400ConnectionPool;
 import org.apache.camel.CamelException;
 import org.apache.camel.Consumer;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultPollingEndpoint;
+import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 
+/**
+ * The jt400 component allows you to exchanges messages with an AS/400 system using data queues or program call.
+ */
 @UriEndpoint(scheme = "jt400", title = "JT400", syntax = "jt400:userID:password/systemName/objectPath.type", consumerClass = Jt400DataQueueConsumer.class, label = "messaging")
-public class Jt400Endpoint extends DefaultPollingEndpoint {
+public class Jt400Endpoint extends ScheduledPollEndpoint {
 
     public static final String KEY = "KEY";
     public static final String SENDER_INFORMATION = "SENDER_INFORMATION";
@@ -45,7 +47,7 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
     /**
      * Creates a new AS/400 data queue endpoint using a default connection pool
      * provided by the component.
-     * 
+     *
      * @throws NullPointerException if {@code component} is null
      */
     protected Jt400Endpoint(String endpointUri, Jt400Component component) throws CamelException {
@@ -67,13 +69,6 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
     }
 
     @Override
-    public PollingConsumer createPollingConsumer() throws Exception {
-        Jt400DataQueueConsumer answer = new Jt400DataQueueConsumer(this);
-        configurePollingConsumer(answer);
-        return answer;
-    }
-
-    @Override
     public Producer createProducer() throws Exception {
         if (Jt400Type.DTAQ == configuration.getType()) {
             return new Jt400DataQueueProducer(this);
@@ -85,7 +80,7 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         if (Jt400Type.DTAQ == configuration.getType()) {
-            Consumer consumer = new Jt400DataQueueConsumer(this);
+            Consumer consumer = new Jt400DataQueueConsumer(this, processor);
             configureConsumer(consumer);
             return consumer;
         } else {
@@ -102,16 +97,16 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
      * Obtains an {@code AS400} object that connects to this endpoint. Since
      * these objects represent limited resources, clients have the
      * responsibility of {@link #releaseSystem(AS400) releasing them} when done.
-     * 
+     *
      * @return an {@code AS400} object that connects to this endpoint
      */
     protected AS400 getSystem() {
         return configuration.getConnection();
     }
-    
+
     /**
      * Releases a previously obtained {@code AS400} object from use.
-     * 
+     *
      * @param system a previously obtained {@code AS400} object
      */
     protected void releaseSystem(AS400 system) {
@@ -121,7 +116,7 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
     /**
      * Returns the fully qualified integrated file system path name of the data
      * queue of this endpoint.
-     * 
+     *
      * @return the fully qualified integrated file system path name of the data
      *         queue of this endpoint
      */
@@ -257,4 +252,12 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
         return configuration.isSecured();
     }
 
-}
+    public int getReadTimeout() {
+        return configuration.getReadTimeout();
+    }
+
+    public void setReadTimeout(int readTimeout) {
+        configuration.setReadTimeout(readTimeout);
+    }
+
+}
\ No newline at end of file