You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2014/08/14 23:36:19 UTC

git commit: CAMEL-7700: Limit the SQL component maxMessagesPerPoll option by using the jdbcTemplate.setMaxRows(size) method

Repository: camel
Updated Branches:
  refs/heads/master d5253916c -> 7764a5193


CAMEL-7700: Limit the SQL component maxMessagesPerPoll option by using the jdbcTemplate.setMaxRows(size) method


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7764a519
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7764a519
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7764a519

Branch: refs/heads/master
Commit: 7764a519355bdd0b13c36817022a2a4819eeb93a
Parents: d525391
Author: cmueller <cm...@apache.org>
Authored: Thu Aug 14 23:35:58 2014 +0200
Committer: cmueller <cm...@apache.org>
Committed: Thu Aug 14 23:36:04 2014 +0200

----------------------------------------------------------------------
 .../apache/camel/component/sql/SqlConsumer.java |  17 ++--
 .../sql/SqlConsumerMaxMessagesPerPollTest.java  | 101 +++++++++++++++++++
 .../sql/createAndPopulateDatabase4.sql          |  21 ++++
 3 files changed, 133 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7764a519/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 57e80e5..8f9800c 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -36,9 +36,6 @@ import org.springframework.dao.DataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.PreparedStatementCallback;
 
-/**
- *
- */
 public class SqlConsumer extends ScheduledBatchPollingConsumer {
 
     private final String query;
@@ -165,9 +162,8 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
         int total = exchanges.size();
 
         // limit if needed
-        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
-            log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
-            total = maxMessagesPerPoll;
+        if (maxMessagesPerPoll > 0 && total == maxMessagesPerPoll) {
+            log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was more messages in this poll.");
         }
 
         for (int index = 0; index < total && isBatchAllowed(); index++) {
@@ -311,4 +307,13 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
     public void setBreakBatchOnConsumeFail(boolean breakBatchOnConsumeFail) {
         this.breakBatchOnConsumeFail = breakBatchOnConsumeFail;
     }
+
+    @Override
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        super.setMaxMessagesPerPoll(maxMessagesPerPoll);
+
+        if (jdbcTemplate != null) {
+            jdbcTemplate.setMaxRows(maxMessagesPerPoll);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/7764a519/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java
new file mode 100644
index 0000000..51a7bb0
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlConsumerMaxMessagesPerPollTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+public class SqlConsumerMaxMessagesPerPollTest extends CamelTestSupport {
+
+    private EmbeddedDatabase db;
+
+    @Before
+    public void setUp() throws Exception {
+        db = new EmbeddedDatabaseBuilder()
+            .setType(EmbeddedDatabaseType.DERBY)
+            .addScript("sql/createAndPopulateDatabase4.sql")
+            .build();
+
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+
+        db.shutdown();
+    }
+
+    @Test
+    public void maxMessagesPerPoll() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(3);
+        assertMockEndpointsSatisfied();
+
+        List<Exchange> exchanges = mock.getReceivedExchanges();
+        assertBodyMapValue(1, "ID", exchanges.get(0));
+        assertBodyMapValue("Camel", "PROJECT", exchanges.get(0));
+        assertProperty(0, "CamelBatchIndex", exchanges.get(0));
+        assertProperty(2, "CamelBatchSize", exchanges.get(0));
+        assertProperty(Boolean.FALSE, "CamelBatchComplete", exchanges.get(0));
+
+        assertBodyMapValue(2, "ID", exchanges.get(1));
+        assertBodyMapValue("AMQ", "PROJECT", exchanges.get(1));
+        assertProperty(1, "CamelBatchIndex", exchanges.get(1));
+        assertProperty(2, "CamelBatchSize", exchanges.get(1));
+        assertProperty(Boolean.TRUE, "CamelBatchComplete", exchanges.get(1)); // end of the first batch
+
+        assertBodyMapValue(3, "ID", exchanges.get(2));
+        assertBodyMapValue("Linux", "PROJECT", exchanges.get(2));
+        assertProperty(0, "CamelBatchIndex", exchanges.get(2)); // the second batch
+        assertProperty(1, "CamelBatchSize", exchanges.get(2)); // only one entry in this batch
+        assertProperty(Boolean.TRUE, "CamelBatchComplete", exchanges.get(2)); // there are no more entries yet
+    }
+
+    private void assertProperty(Object value, String propertyName, Exchange exchange) {
+        assertEquals(value, exchange.getProperty(propertyName));
+    }
+
+    private void assertBodyMapValue(Object value, String key, Exchange exchange) {
+        assertEquals(value, exchange.getIn().getBody(Map.class).get(key));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                getContext().setTracing(true);
+                getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+                from("sql:select * from projects where processed = false order by id?maxMessagesPerPoll=2")
+                    .to("mock:result")
+                    .to("sql:update projects set processed = true where id = :#id");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/7764a519/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql
----------------------------------------------------------------------
diff --git a/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql b/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql
new file mode 100644
index 0000000..e8c6bd9
--- /dev/null
+++ b/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase4.sql
@@ -0,0 +1,21 @@
+-- ------------------------------------------------------------------------
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+-- ------------------------------------------------------------------------
+
+create table projects (id integer primary key GENERATED ALWAYS AS IDENTITY, project varchar(10), license varchar(5), description varchar(1000) default null, processed boolean);
+insert into projects (project, license, description, processed) values ('Camel', 'ASF', '', false);
+insert into projects (project, license, description, processed) values ('AMQ', 'ASF', '', false);
+insert into projects (project, license, description, processed) values ('Linux', 'XXX', '', false);