You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/02/15 09:13:11 UTC

svn commit: r1070801 - in /camel/trunk/components/camel-mybatis/src: main/java/org/apache/camel/component/mybatis/ test/java/org/apache/camel/component/mybatis/ test/resources/

Author: davsclaus
Date: Tue Feb 15 08:13:10 2011
New Revision: 1070801

URL: http://svn.apache.org/viewvc?rev=1070801&view=rev
Log:
CAMEL-2954: added consumer to mybatis component.

Added:
    camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/DefaultMyBatisProcessingStrategy.java
    camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
    camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisProcessingStrategy.java
    camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisBatchConsumerTest.java
    camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisPollingDelayRouteTest.java
    camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisQueueTest.java
    camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisRouteEmptyResultSetTest.java
    camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownAllTasksTest.java
    camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownCurrentTaskOnlyTest.java
    camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml   (contents, props changed)
      - copied, changed from r1070784, camel/trunk/components/camel-mybatis/src/test/resources/Configuration.xml
Removed:
    camel/trunk/components/camel-mybatis/src/test/resources/Configuration.xml
Modified:
    camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
    camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java
    camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisTestSupport.java
    camel/trunk/components/camel-mybatis/src/test/resources/log4j.properties

Added: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/DefaultMyBatisProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/DefaultMyBatisProcessingStrategy.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/DefaultMyBatisProcessingStrategy.java (added)
+++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/DefaultMyBatisProcessingStrategy.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.ibatis.session.SqlSession;
+
+/**
+ * @version $Revision$
+ */
+public class DefaultMyBatisProcessingStrategy implements MyBatisProcessingStrategy {
+
+    public void commit(MyBatisEndpoint endpoint, Exchange exchange, Object data, String consumeStatements) throws Exception {
+        SqlSession session = endpoint.getSqlSessionFactory().openSession();
+
+        String[] statements = consumeStatements.split(",");
+        try {
+            for (String statement : statements) {
+                session.update(statement.trim(), data);
+            }
+        } finally {
+            session.commit();
+            session.close();
+        }
+    }
+
+    public List poll(MyBatisConsumer consumer, MyBatisEndpoint endpoint) throws Exception {
+        SqlSession session = endpoint.getSqlSessionFactory().openSession();
+        try {
+            return session.selectList(endpoint.getStatement(), null);
+        } finally {
+            session.close();
+        }
+    }
+}

Modified: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java?rev=1070801&r1=1070800&r2=1070801&view=diff
==============================================================================
--- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java (original)
+++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisComponent.java Tue Feb 15 08:13:10 2011
@@ -17,11 +17,13 @@
 package org.apache.camel.component.mybatis;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.Reader;
 import java.util.Map;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.ibatis.io.Resources;
 import org.apache.ibatis.session.SqlSessionFactory;
@@ -33,7 +35,7 @@ import org.apache.ibatis.session.SqlSess
 public class MyBatisComponent extends DefaultComponent {
 
     private SqlSessionFactory sqlSessionFactory;
-    private String configurationUri = "Configuration.xml";
+    private String configurationUri = "SqlMapConfig.xml";
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
@@ -44,8 +46,12 @@ public class MyBatisComponent extends De
 
     private SqlSessionFactory createSqlSessionFactory() throws IOException {
         ObjectHelper.notNull(configurationUri, "configurationUri", this);
-        Reader reader = Resources.getResourceAsReader(configurationUri);
-        return new SqlSessionFactoryBuilder().build(reader);
+        InputStream is = getCamelContext().getClassResolver().loadResourceAsStream(configurationUri);
+        try {
+            return new SqlSessionFactoryBuilder().build(is);
+        } finally {
+            IOHelper.close(is);
+        }
     }
 
     public synchronized SqlSessionFactory getSqlSessionFactory() throws IOException {

Added: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java (added)
+++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.camel.BatchConsumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Consumer to read data from a database.
+ *
+ * @version $Revision$
+ */
+public class MyBatisConsumer extends ScheduledPollConsumer implements BatchConsumer, ShutdownAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MyBatisConsumer.class);
+
+    private final class DataHolder {
+        private Exchange exchange;
+        private Object data;
+
+        private DataHolder() {
+        }
+    }
+
+    protected volatile ShutdownRunningTask shutdownRunningTask;
+    protected volatile int pendingExchanges;
+
+    /**
+     * Statement to run after data has been processed in the route
+     */
+    private String onConsume;
+
+    /**
+     * Process resultset individually or as a list
+     */
+    private boolean useIterator = true;
+
+    /**
+     * Whether allow empty resultset to be routed to the next hop
+     */
+    private boolean routeEmptyResultSet;
+
+    private int maxMessagesPerPoll;
+
+    public MyBatisConsumer(MyBatisEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    public MyBatisEndpoint getEndpoint() {
+        return (MyBatisEndpoint) super.getEndpoint();
+    }
+
+    /**
+     * Polls the database
+     */
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        // poll data from the database
+        MyBatisEndpoint endpoint = getEndpoint();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Polling: " + endpoint);
+        }
+        List<Object> data = CastUtils.cast(endpoint.getProcessingStrategy().poll(this, getEndpoint()));
+
+        // create a list of exchange objects with the data
+        Queue<DataHolder> answer = new LinkedList<DataHolder>();
+        if (useIterator) {
+            for (Object item : data) {
+                Exchange exchange = createExchange(item);
+                DataHolder holder = new DataHolder();
+                holder.exchange = exchange;
+                holder.data = item;
+                answer.add(holder);
+            }
+        } else {
+            if (!data.isEmpty() || routeEmptyResultSet) {
+                Exchange exchange = createExchange(data);
+                DataHolder holder = new DataHolder();
+                holder.exchange = exchange;
+                holder.data = data;
+                answer.add(holder);
+            }
+        }
+
+        // process all the exchanges in this batch
+        return processBatch(CastUtils.cast(answer));
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        final MyBatisEndpoint endpoint = getEndpoint();
+
+        int total = exchanges.size();
+
+        // limit if needed
+        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
+            LOG.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
+            total = maxMessagesPerPoll;
+        }
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            DataHolder holder = ObjectHelper.cast(DataHolder.class, exchanges.poll());
+            Exchange exchange = holder.exchange;
+            Object data = holder.data;
+
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // process the current exchange
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Processing exchange: " + exchange);
+            }
+            getProcessor().process(exchange);
+
+            try {
+                if (onConsume != null) {
+                    endpoint.getProcessingStrategy().commit(endpoint, exchange, data, onConsume);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+
+        return total;
+    }
+
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // store a reference what to do in case when shutting down and we have pending messages
+        this.shutdownRunningTask = shutdownRunningTask;
+        // do not defer shutdown
+        return false;
+    }
+
+    public int getPendingExchangesSize() {
+        // only return the real pending size in case we are configured to complete all tasks
+        if (ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask) {
+            return pendingExchanges;
+        } else {
+            return 0;
+        }
+    }
+
+    public void prepareShutdown() {
+        // noop
+    }
+
+    public boolean isBatchAllowed() {
+        // stop if we are not running
+        boolean answer = isRunAllowed();
+        if (!answer) {
+            return false;
+        }
+
+        if (shutdownRunningTask == null) {
+            // we are not shutting down so continue to run
+            return true;
+        }
+
+        // we are shutting down so only continue if we are configured to complete all tasks
+        return ShutdownRunningTask.CompleteAllTasks == shutdownRunningTask;
+    }
+
+    private Exchange createExchange(Object data) {
+        final MyBatisEndpoint endpoint = getEndpoint();
+        final Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
+
+        Message msg = exchange.getIn();
+        msg.setBody(data);
+        msg.setHeader(MyBatisConstants.MYBATIS_STATEMENT_NAME, endpoint.getStatement());
+
+        return exchange;
+    }
+
+    /**
+     * Gets the statement(s) to run after successful processing.
+     * Use comma to separate multiple statements.
+     */
+    public String getOnConsume() {
+        return onConsume;
+    }
+
+    /**
+     * Sets the statement to run after successful processing.
+     * Use comma to separate multiple statements.
+     */
+    public void setOnConsume(String onConsume) {
+        this.onConsume = onConsume;
+    }
+
+    /**
+     * Indicates how resultset should be delivered to the route
+     */
+    public boolean isUseIterator() {
+        return useIterator;
+    }
+
+    /**
+     * Sets how resultset should be delivered to route.
+     * Indicates delivery as either a list or individual object.
+     * defaults to true.
+     */
+    public void setUseIterator(boolean useIterator) {
+        this.useIterator = useIterator;
+    }
+
+    /**
+     * Indicates whether empty resultset should be allowed to be sent to the next hop or not
+     */
+    public boolean isRouteEmptyResultSet() {
+        return routeEmptyResultSet;
+    }
+
+    /**
+     * Sets whether empty resultset should be allowed to be sent to the next hop.
+     * defaults to false. So the empty resultset will be filtered out.
+     */
+    public void setRouteEmptyResultSet(boolean routeEmptyResultSet) {
+        this.routeEmptyResultSet = routeEmptyResultSet;
+    }
+}

Modified: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java?rev=1070801&r1=1070800&r2=1070801&view=diff
==============================================================================
--- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java (original)
+++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisEndpoint.java Tue Feb 15 08:13:10 2011
@@ -22,17 +22,19 @@ import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultPollingEndpoint;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.ibatis.session.SqlSessionFactory;
 
 /**
  * @version $Revision$
  */
-public class MyBatisEndpoint extends DefaultEndpoint {
+public class MyBatisEndpoint extends DefaultPollingEndpoint {
 
+    private MyBatisProcessingStrategy processingStrategy;
     private String statement;
     private StatementType statementType;
+    private int maxMessagesPerPoll;
 
     public MyBatisEndpoint() {
     }
@@ -49,7 +51,11 @@ public class MyBatisEndpoint extends Def
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new UnsupportedOperationException("Consumer not supported by MyBatis component: " + getEndpointUri());
+        ObjectHelper.notNull(statement, "statement", this);
+        MyBatisConsumer consumer = new MyBatisConsumer(this, processor);
+        consumer.setMaxMessagesPerPoll(getMaxMessagesPerPoll());
+        configureConsumer(consumer);
+        return consumer;
     }
 
     public boolean isSingleton() {
@@ -80,4 +86,24 @@ public class MyBatisEndpoint extends Def
     public void setStatementType(StatementType statementType) {
         this.statementType = statementType;
     }
+
+    public synchronized MyBatisProcessingStrategy getProcessingStrategy() {
+        if (processingStrategy == null) {
+            processingStrategy = new DefaultMyBatisProcessingStrategy();
+        }
+        return processingStrategy;
+    }
+
+    public void setProcessingStrategy(MyBatisProcessingStrategy processingStrategy) {
+        this.processingStrategy = processingStrategy;
+    }
+
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
 }

Added: camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisProcessingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisProcessingStrategy.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisProcessingStrategy.java (added)
+++ camel/trunk/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisProcessingStrategy.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Processing strategy for dealing with MyBatis when consuming.
+ *
+ * @version $Revision$
+ */
+public interface MyBatisProcessingStrategy {
+
+    /**
+     * Called when record is being queried.
+     *
+     * @param consumer the consumer
+     * @param endpoint the endpoint
+     * @return Results of the query as a {@link List}
+     * @throws Exception can be thrown in case of error
+     */
+    List poll(MyBatisConsumer consumer, MyBatisEndpoint endpoint) throws Exception;
+
+    /**
+     * Commit callback if there are a statements to be run after processing.
+     *
+     * @param endpoint          the endpoint
+     * @param exchange          The exchange after it has been processed
+     * @param data              The original data delivered to the route
+     * @param consumeStatements Name of the statement(s) to run, will use SQL update. Use comma to provide multiple statements to run.
+     * @throws Exception can be thrown in case of error
+     */
+    void commit(MyBatisEndpoint endpoint, Exchange exchange, Object data, String consumeStatements) throws Exception;
+
+}

Added: camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisBatchConsumerTest.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisBatchConsumerTest.java (added)
+++ camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisBatchConsumerTest.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class MyBatisBatchConsumerTest extends MyBatisTestSupport {
+
+    @Test
+    public void testBatchConsumer() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+        mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+        mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("mybatis:selectAllAccounts").to("mock:result");
+            }
+        };
+    }
+
+}

Added: camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisPollingDelayRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisPollingDelayRouteTest.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisPollingDelayRouteTest.java (added)
+++ camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisPollingDelayRouteTest.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class MyBatisPollingDelayRouteTest extends MyBatisTestSupport {
+
+    @Test
+    public void testSendAccountBean() throws Exception {
+        long start = System.currentTimeMillis();
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+
+        assertMockEndpointsSatisfied();
+        long delta = System.currentTimeMillis() - start;
+
+        assertTrue("Should not take that long: " + delta, delta < 7000);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // run this timer every 2nd second, that will select data from the database and send it to the mock endpoint
+                from("timer://pollTheDatabase?delay=2000").to("mybatis:selectAllAccounts?statementType=SelectList").to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+}

Added: camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisQueueTest.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisQueueTest.java (added)
+++ camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisQueueTest.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class MyBatisQueueTest extends CamelTestSupport {
+
+    @Test
+    public void testConsume() throws Exception {
+
+        MockEndpoint endpoint = getMockEndpoint("mock:results");
+        endpoint.expectedMinimumMessageCount(2);
+
+        Account account = new Account();
+        account.setId(1);
+        account.setFirstName("Bob");
+        account.setLastName("Denver");
+        account.setEmailAddress("TryGuessingGilligan@gmail.com");
+
+        template.sendBody("direct:start", account);
+
+        account = new Account();
+        account.setId(2);
+        account.setFirstName("Alan");
+        account.setLastName("Hale");
+        account.setEmailAddress("TryGuessingSkipper@gmail.com");
+
+        template.sendBody("direct:start", account);
+
+        assertMockEndpointsSatisfied();
+
+        // need a delay here on slower machines
+        Thread.sleep(1000);
+
+        // now lets poll that the account has been inserted
+        List body = template.requestBody("mybatis:selectProcessedAccounts?statementType=SelectList", null, List.class);
+
+        assertEquals("Wrong size: " + body, 2, body.size());
+        Account actual = assertIsInstanceOf(Account.class, body.get(0));
+
+        assertEquals("Account.getFirstName()", "Bob", actual.getFirstName());
+        assertEquals("Account.getLastName()", "Denver", actual.getLastName());
+
+        body = template.requestBody("mybatis:selectUnprocessedAccounts?statementType=SelectList", null, List.class);
+        assertEquals("Wrong size: " + body, 0, body.size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                from("mybatis:selectUnprocessedAccounts?consumer.onConsume=consumeAccount").to("mock:results");
+                // END SNIPPET: e1
+
+                from("direct:start").to("mybatis:insertAccount?statementType=Insert");
+            }
+        };
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        // lets create the database...
+        Connection connection = createConnection();
+        Statement statement = connection.createStatement();
+        statement.execute("create table ACCOUNT ( ACC_ID INTEGER , ACC_FIRST_NAME VARCHAR(255), ACC_LAST_NAME VARCHAR(255), ACC_EMAIL VARCHAR(255), PROCESSED BOOLEAN DEFAULT false)");
+        connection.close();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        Connection connection = createConnection();
+        Statement statement = connection.createStatement();
+        statement.execute("drop table ACCOUNT");
+        connection.close();
+
+        super.tearDown();
+    }
+
+    private Connection createConnection() throws Exception {
+        MyBatisEndpoint endpoint = resolveMandatoryEndpoint("mybatis:Account", MyBatisEndpoint.class);
+        return endpoint.getSqlSessionFactory().getConfiguration().getEnvironment().getDataSource().getConnection();
+    }
+
+}

Added: camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisRouteEmptyResultSetTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisRouteEmptyResultSetTest.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisRouteEmptyResultSetTest.java (added)
+++ camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisRouteEmptyResultSetTest.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import java.util.ArrayList;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class MyBatisRouteEmptyResultSetTest extends MyBatisTestSupport {
+
+    @Test
+    public void testRouteEmptyResultSet() throws Exception {
+        MockEndpoint endpoint = getMockEndpoint("mock:results");
+        endpoint.expectedMinimumMessageCount(1);
+
+        assertMockEndpointsSatisfied();
+
+        // should be an empty list
+        assertEquals("Should be an empty list", 0, endpoint.getReceivedExchanges().get(0).getIn().getBody(ArrayList.class).size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("mybatis:selectAllAccounts?consumer.useIterator=false&consumer.routeEmptyResultSet=true").to("mock:results");
+            }
+        };
+    }
+
+    @Override
+    protected boolean createTestData() {
+        // no test data so an empty resultset
+        return false;
+    }
+
+}

Added: camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownAllTasksTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownAllTasksTest.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownAllTasksTest.java (added)
+++ camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownAllTasksTest.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class MyBatisShutdownAllTasksTest extends MyBatisTestSupport {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        // super will insert 2 accounts already
+
+        Account account = new Account();
+        account.setId(881);
+        account.setFirstName("A");
+        account.setLastName("A");
+        account.setEmailAddress("a@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(882);
+        account.setFirstName("B");
+        account.setLastName("B");
+        account.setEmailAddress("b@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(883);
+        account.setFirstName("C");
+        account.setLastName("C");
+        account.setEmailAddress("c@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(884);
+        account.setFirstName("D");
+        account.setLastName("D");
+        account.setEmailAddress("d@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(885);
+        account.setFirstName("E");
+        account.setLastName("E");
+        account.setEmailAddress("e@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(886);
+        account.setFirstName("F");
+        account.setLastName("F");
+        account.setEmailAddress("f@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+    }
+
+    @Test
+    public void testShutdownAllTasks() throws Exception {
+        context.startRoute("route1");
+
+        MockEndpoint bar = getMockEndpoint("mock:bar");
+        bar.expectedMinimumMessageCount(1);
+        bar.setResultWaitTime(3000);
+
+        assertMockEndpointsSatisfied();
+
+        // shutdown during processing
+        context.stop();
+
+        // sleep a little
+        Thread.sleep(1000);
+
+        // should route all 8
+        assertEquals("Should complete all messages", 8, bar.getReceivedCounter());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("mybatis:selectAllAccounts").noAutoStartup().routeId("route1")
+                     // let it complete all tasks
+                     .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks)
+                     .delay(1000).to("seda:foo");
+
+                from("seda:foo").routeId("route2").to("mock:bar");
+            }
+        };
+    }
+}

Added: camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownCurrentTaskOnlyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownCurrentTaskOnlyTest.java?rev=1070801&view=auto
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownCurrentTaskOnlyTest.java (added)
+++ camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisShutdownCurrentTaskOnlyTest.java Tue Feb 15 08:13:10 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mybatis;
+
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class MyBatisShutdownCurrentTaskOnlyTest extends MyBatisTestSupport {
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        // super will insert 2 accounts already
+
+        Account account = new Account();
+        account.setId(881);
+        account.setFirstName("A");
+        account.setLastName("A");
+        account.setEmailAddress("a@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(882);
+        account.setFirstName("B");
+        account.setLastName("B");
+        account.setEmailAddress("b@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(883);
+        account.setFirstName("C");
+        account.setLastName("C");
+        account.setEmailAddress("c@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(884);
+        account.setFirstName("D");
+        account.setLastName("D");
+        account.setEmailAddress("d@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(885);
+        account.setFirstName("E");
+        account.setLastName("E");
+        account.setEmailAddress("e@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+        account = new Account();
+        account.setId(886);
+        account.setFirstName("F");
+        account.setLastName("F");
+        account.setEmailAddress("f@gmail.com");
+
+        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+    }
+
+    @Test
+    public void testShutdownCompleteCurrentTaskOnly() throws Exception {
+        MockEndpoint bar = getMockEndpoint("mock:bar");
+        bar.expectedMinimumMessageCount(1);
+        bar.setResultWaitTime(3000);
+
+        assertMockEndpointsSatisfied();
+
+        // shutdown during processing
+        context.stop();
+
+        // should NOT route all 8
+        assertTrue("Should NOT complete all messages, was: " + bar.getReceivedCounter(), bar.getReceivedCounter() < 8);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("mybatis:selectAllAccounts").routeId("route1")
+                     // let it complete only current task so we shutdown faster
+                     .shutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly)
+                     .delay(1000).to("seda:foo");
+
+                from("seda:foo").routeId("route2").to("mock:bar");
+            }
+        };
+    }
+
+}

Modified: camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisTestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisTestSupport.java?rev=1070801&r1=1070800&r2=1070801&view=diff
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisTestSupport.java (original)
+++ camel/trunk/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisTestSupport.java Tue Feb 15 08:13:10 2011
@@ -28,6 +28,10 @@ import org.junit.Before;
  */
 public abstract class MyBatisTestSupport extends CamelTestSupport {
 
+    protected boolean createTestData() {
+        return true;
+    }
+
     @Override
     @Before
     public void setUp() throws Exception {
@@ -39,19 +43,21 @@ public abstract class MyBatisTestSupport
         statement.execute("create table ACCOUNT ( ACC_ID INTEGER , ACC_FIRST_NAME VARCHAR(255), ACC_LAST_NAME VARCHAR(255), ACC_EMAIL VARCHAR(255)  )");
         connection.close();
 
-        Account account = new Account();
-        account.setId(123);
-        account.setFirstName("James");
-        account.setLastName("Strachan");
-        account.setEmailAddress("TryGuessing@gmail.com");
-        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
-
-        account = new Account();
-        account.setId(456);
-        account.setFirstName("Claus");
-        account.setLastName("Ibsen");
-        account.setEmailAddress("Noname@gmail.com");
-        template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+        if (createTestData()) {
+            Account account = new Account();
+            account.setId(123);
+            account.setFirstName("James");
+            account.setLastName("Strachan");
+            account.setEmailAddress("TryGuessing@gmail.com");
+            template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+
+            account = new Account();
+            account.setId(456);
+            account.setFirstName("Claus");
+            account.setLastName("Ibsen");
+            account.setEmailAddress("Noname@gmail.com");
+            template.sendBody("mybatis:insertAccount?statementType=Insert", account);
+        }
     }
 
     @Override

Copied: camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml (from r1070784, camel/trunk/components/camel-mybatis/src/test/resources/Configuration.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml?p2=camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml&p1=camel/trunk/components/camel-mybatis/src/test/resources/Configuration.xml&r1=1070784&r2=1070801&rev=1070801&view=diff
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/resources/Configuration.xml (original)
+++ camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml Tue Feb 15 08:13:10 2011
@@ -23,7 +23,7 @@
 <configuration>
 
     <settings>
-        <setting name="cacheEnabled" value="false"/>
+        <setting name="useGeneratedKeys" value="false"/>
     </settings>
 
     <!-- Use type aliases to avoid typing the full classname every time. -->

Propchange: camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: camel/trunk/components/camel-mybatis/src/test/resources/SqlMapConfig.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Modified: camel/trunk/components/camel-mybatis/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mybatis/src/test/resources/log4j.properties?rev=1070801&r1=1070800&r2=1070801&view=diff
==============================================================================
--- camel/trunk/components/camel-mybatis/src/test/resources/log4j.properties (original)
+++ camel/trunk/components/camel-mybatis/src/test/resources/log4j.properties Tue Feb 15 08:13:10 2011
@@ -18,11 +18,17 @@
 #
 # The logging properties used for testing
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=INFO, file
 
 # uncomment the following to enable debug of Camel
 #log4j.logger.org.apache.camel=DEBUG
-#log4j.logger.com.mybatis=DEBUG
+
+# mybatis logging
+#log4j.logger.org.apache.ibatis=TRACE
+#log4j.logger.java.sql.Connection=DEBUG
+#log4j.logger.java.sql.Statement=DEBUG
+#log4j.logger.java.sql.PreparedStatement=DEBUG
+#log4j.logger.java.sql.ResultSet=DEBUG
 
 # CONSOLE appender not used by default
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender