You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by jb...@apache.org on 2016/12/09 17:32:47 UTC
camel git commit: [CAMEL-10271] Fix jt400 polling consumer endpoint
Repository: camel
Updated Branches:
refs/heads/camel-2.16.x 9a58b0dd1 -> 975f1a18e
[CAMEL-10271] Fix jt400 polling consumer endpoint
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/975f1a18
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/975f1a18
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/975f1a18
Branch: refs/heads/camel-2.16.x
Commit: 975f1a18ec2c4aea781d52732e99bfb7383bfc87
Parents: 9a58b0d
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Fri Dec 9 18:32:08 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Fri Dec 9 18:32:08 2016 +0100
----------------------------------------------------------------------
.../component/jt400/Jt400Configuration.java | 44 ++++++++++------
.../component/jt400/Jt400DataQueueConsumer.java | 53 +++++++++++++-------
.../camel/component/jt400/Jt400Endpoint.java | 37 +++++++-------
3 files changed, 83 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/975f1a18/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
----------------------------------------------------------------------
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
index b33a25d..e6e4658 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
@@ -88,10 +88,10 @@ public class Jt400Configuration {
@UriParam
private int ccsid = DEFAULT_SYSTEM_CCSID;
-
+
@UriParam(defaultValue = "text")
private Format format = Format.text;
-
+
@UriParam
private boolean guiAvailable;
@@ -113,17 +113,20 @@ public class Jt400Configuration {
@UriParam
private Integer[] outputFieldsLengthArray;
+ @UriParam(label = "consumer", defaultValue = "30000")
+ private int readTimeout = 30000;
+
public Jt400Configuration(String endpointUri, AS400ConnectionPool connectionPool) throws URISyntaxException {
ObjectHelper.notNull(endpointUri, "endpointUri", this);
ObjectHelper.notNull(connectionPool, "connectionPool", this);
-
+
URI uri = new URI(endpointUri);
String[] credentials = uri.getUserInfo().split(":");
systemName = uri.getHost();
userID = credentials[0];
password = credentials[1];
objectPath = uri.getPath();
-
+
this.connectionPool = connectionPool;
}
@@ -184,7 +187,7 @@ public class Jt400Configuration {
}
// Options
-
+
/**
* Returns the CCSID to use for the connection with the AS/400 system.
* Returns -1 if the CCSID to use is the default system CCSID.
@@ -192,21 +195,21 @@ public class Jt400Configuration {
public int getCssid() {
return ccsid;
}
-
+
/**
* Sets the CCSID to use for the connection with the AS/400 system.
*/
public void setCcsid(int ccsid) {
this.ccsid = (ccsid < 0) ? DEFAULT_SYSTEM_CCSID : ccsid;
}
-
+
/**
* Returns the data format for sending messages.
*/
public Format getFormat() {
return format;
}
-
+
/**
* Sets the data format for sending messages.
*/
@@ -214,7 +217,7 @@ public class Jt400Configuration {
ObjectHelper.notNull(format, "format", this);
this.format = format;
}
-
+
/**
* Returns whether AS/400 prompting is enabled in the environment running
* Camel.
@@ -222,7 +225,7 @@ public class Jt400Configuration {
public boolean isGuiAvailable() {
return guiAvailable;
}
-
+
/**
* Sets whether AS/400 prompting is enabled in the environment running
* Camel.
@@ -301,6 +304,17 @@ public class Jt400Configuration {
this.outputFieldsLengthArray = outputFieldsLengthArray;
}
+ public int getReadTimeout() {
+ return readTimeout;
+ }
+
+ /**
+ * Timeout in millis the consumer will wait while trying to read a new message of the data queue.
+ */
+ public void setReadTimeout(int readTimeout) {
+ this.readTimeout = readTimeout;
+ }
+
public void setOutputFieldsIdx(String outputFieldsIdx) {
if (outputFieldsIdx != null) {
String[] outputArray = outputFieldsIdx.split(",");
@@ -324,13 +338,13 @@ public class Jt400Configuration {
}
// AS400 connections
-
+
/**
* Obtains an {@code AS400} object that connects to this endpoint. Since
* these objects represent limited resources, clients have the
* responsibility of {@link #releaseConnection(AS400) releasing them} when
* done.
- *
+ *
* @return an {@code AS400} object that connects to this endpoint
*/
public AS400 getConnection() {
@@ -361,10 +375,10 @@ public class Jt400Configuration {
throw new RuntimeCamelException("Unable to set the CSSID to use with " + system, e);
}
}
-
+
/**
* Releases a previously obtained {@code AS400} object from use.
- *
+ *
* @param connection a previously obtained {@code AS400} object to release
*/
public void releaseConnection(AS400 connection) {
@@ -372,4 +386,4 @@ public class Jt400Configuration {
connectionPool.returnConnectionToPool(connection);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/975f1a18/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
index 825af3c..8327ba2 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
@@ -22,17 +22,15 @@ import com.ibm.as400.access.DataQueueEntry;
import com.ibm.as400.access.KeyedDataQueue;
import com.ibm.as400.access.KeyedDataQueueEntry;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.DefaultExchange;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.impl.ScheduledPollConsumer;
/**
- * {@link org.apache.camel.PollingConsumer} that polls a data queue for data
+ * A scheduled {@link org.apache.camel.Consumer} that polls a data queue for data
*/
-public class Jt400DataQueueConsumer extends PollingConsumerSupport {
+public class Jt400DataQueueConsumer extends ScheduledPollConsumer {
- private final Jt400Endpoint endpoint;
-
/**
* Performs the lifecycle logic of this consumer.
*/
@@ -41,27 +39,46 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
/**
* Creates a new consumer instance
*/
- protected Jt400DataQueueConsumer(Jt400Endpoint endpoint) {
- super(endpoint);
- this.endpoint = endpoint;
+ public Jt400DataQueueConsumer(Jt400Endpoint endpoint, Processor processor) {
+ super(endpoint, processor);
this.queueService = new Jt400DataQueueService(endpoint);
}
@Override
+ public Jt400Endpoint getEndpoint() {
+ return (Jt400Endpoint) super.getEndpoint();
+ }
+
+ @Override
+ protected int poll() throws Exception {
+ Exchange exchange = receive(getEndpoint().getReadTimeout());
+ if (exchange != null) {
+ getProcessor().process(exchange);
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
protected void doStart() throws Exception {
queueService.start();
+ super.doStart();
}
@Override
protected void doStop() throws Exception {
+ super.doStop();
queueService.stop();
}
+ @Deprecated
public Exchange receive() {
// -1 to indicate a blocking read from data queue
return receive(-1);
}
+ @Deprecated
public Exchange receiveNoWait() {
return receive(0);
}
@@ -86,7 +103,7 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
public Exchange receive(long timeout) {
BaseDataQueue queue = queueService.getDataQueue();
try {
- if (endpoint.isKeyed()) {
+ if (getEndpoint().isKeyed()) {
return receive((KeyedDataQueue) queue, timeout);
} else {
return receive((DataQueue) queue, timeout);
@@ -107,11 +124,10 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
entry = queue.read(-1);
}
- Exchange exchange = new DefaultExchange(endpoint.getCamelContext());
- exchange.setFromEndpoint(endpoint);
+ Exchange exchange = getEndpoint().createExchange();
if (entry != null) {
exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, entry.getSenderInformation());
- if (endpoint.getFormat() == Jt400Configuration.Format.binary) {
+ if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) {
exchange.getIn().setBody(entry.getData());
} else {
exchange.getIn().setBody(entry.getString());
@@ -122,8 +138,8 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
}
private Exchange receive(KeyedDataQueue queue, long timeout) throws Exception {
- String key = endpoint.getSearchKey();
- String searchType = endpoint.getSearchType().name();
+ String key = getEndpoint().getSearchKey();
+ String searchType = getEndpoint().getSearchType().name();
KeyedDataQueueEntry entry;
if (timeout >= 0) {
int seconds = (int) timeout / 1000;
@@ -134,11 +150,10 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
entry = queue.read(key, -1, searchType);
}
- Exchange exchange = new DefaultExchange(endpoint.getCamelContext());
- exchange.setFromEndpoint(endpoint);
+ Exchange exchange = getEndpoint().createExchange();
if (entry != null) {
exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, entry.getSenderInformation());
- if (endpoint.getFormat() == Jt400Configuration.Format.binary) {
+ if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) {
exchange.getIn().setBody(entry.getData());
exchange.getIn().setHeader(Jt400Endpoint.KEY, entry.getKey());
} else {
@@ -150,4 +165,4 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport {
return null;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/975f1a18/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
index 14767e0..b487e61 100644
--- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
+++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
@@ -24,17 +24,19 @@ import com.ibm.as400.access.AS400;
import com.ibm.as400.access.AS400ConnectionPool;
import org.apache.camel.CamelException;
import org.apache.camel.Consumer;
-import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultPollingEndpoint;
+import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
+/**
+ * The jt400 component allows you to exchanges messages with an AS/400 system using data queues or program call.
+ */
@UriEndpoint(scheme = "jt400", title = "JT400", syntax = "jt400:userID:password/systemName/objectPath.type", consumerClass = Jt400DataQueueConsumer.class, label = "messaging")
-public class Jt400Endpoint extends DefaultPollingEndpoint {
+public class Jt400Endpoint extends ScheduledPollEndpoint {
public static final String KEY = "KEY";
public static final String SENDER_INFORMATION = "SENDER_INFORMATION";
@@ -45,7 +47,7 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
/**
* Creates a new AS/400 data queue endpoint using a default connection pool
* provided by the component.
- *
+ *
* @throws NullPointerException if {@code component} is null
*/
protected Jt400Endpoint(String endpointUri, Jt400Component component) throws CamelException {
@@ -67,13 +69,6 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
}
@Override
- public PollingConsumer createPollingConsumer() throws Exception {
- Jt400DataQueueConsumer answer = new Jt400DataQueueConsumer(this);
- configurePollingConsumer(answer);
- return answer;
- }
-
- @Override
public Producer createProducer() throws Exception {
if (Jt400Type.DTAQ == configuration.getType()) {
return new Jt400DataQueueProducer(this);
@@ -85,7 +80,7 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
@Override
public Consumer createConsumer(Processor processor) throws Exception {
if (Jt400Type.DTAQ == configuration.getType()) {
- Consumer consumer = new Jt400DataQueueConsumer(this);
+ Consumer consumer = new Jt400DataQueueConsumer(this, processor);
configureConsumer(consumer);
return consumer;
} else {
@@ -102,16 +97,16 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
* Obtains an {@code AS400} object that connects to this endpoint. Since
* these objects represent limited resources, clients have the
* responsibility of {@link #releaseSystem(AS400) releasing them} when done.
- *
+ *
* @return an {@code AS400} object that connects to this endpoint
*/
protected AS400 getSystem() {
return configuration.getConnection();
}
-
+
/**
* Releases a previously obtained {@code AS400} object from use.
- *
+ *
* @param system a previously obtained {@code AS400} object
*/
protected void releaseSystem(AS400 system) {
@@ -121,7 +116,7 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
/**
* Returns the fully qualified integrated file system path name of the data
* queue of this endpoint.
- *
+ *
* @return the fully qualified integrated file system path name of the data
* queue of this endpoint
*/
@@ -257,4 +252,12 @@ public class Jt400Endpoint extends DefaultPollingEndpoint {
return configuration.isSecured();
}
-}
+ public int getReadTimeout() {
+ return configuration.getReadTimeout();
+ }
+
+ public void setReadTimeout(int readTimeout) {
+ configuration.setReadTimeout(readTimeout);
+ }
+
+}
\ No newline at end of file