You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2014/06/27 11:27:59 UTC

git commit: polished the code a bit, added some more tests and removed a deprecated API call

Repository: camel
Updated Branches:
  refs/heads/master 745aa5ce4 -> fa9b6e5ef


polished the code a bit, added some more tests and removed a deprecated API call


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fa9b6e5e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fa9b6e5e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fa9b6e5e

Branch: refs/heads/master
Commit: fa9b6e5efbbff588d2a2c2f982c6e26cdebf39be
Parents: 745aa5c
Author: cmueller <cm...@apache.org>
Authored: Fri Jun 27 11:27:29 2014 +0200
Committer: cmueller <cm...@apache.org>
Committed: Fri Jun 27 11:27:48 2014 +0200

----------------------------------------------------------------------
 .../camel/component/hbase/HBaseConsumer.java    |  3 --
 .../camel/component/hbase/HBaseProducer.java    |  8 ++---
 .../idempotent/HBaseIdempotentRepository.java   |  1 +
 .../component/hbase/CamelHBaseFilterTest.java   | 31 +++-------------
 .../component/hbase/CamelHBaseTestSupport.java  | 29 +++++++++++++++
 .../component/hbase/HBaseConsumerTest.java      | 30 ++--------------
 .../component/hbase/HBaseConvertionsTest.java   | 30 ++--------------
 .../component/hbase/HBaseProducerTest.java      | 37 +++-----------------
 .../HBaseIdempotentRepositoryTest.java          | 16 +++++++--
 9 files changed, 59 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
index 67a3050..c803418 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
@@ -149,7 +149,6 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
-
     @Override
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
@@ -206,5 +205,3 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
         this.rowModel = rowModel;
     }
 }
-
-

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index 7d4ab3d..b9c5ebb 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
  * The HBase producer.
  */
 public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
+
     private HBaseEndpoint endpoint;
     private String tableName;
     private final HTablePool tablePool;
@@ -59,7 +60,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
         this.rowModel = endpoint.getRowModel();
     }
 
-
     public void process(Exchange exchange) throws Exception {
         HTableInterface table = tablePool.getTable(tableName.getBytes());
         try {
@@ -101,11 +101,10 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
                 mappingStrategy.applyScanResults(exchange.getOut(), new HBaseData(scanOperationResult));
             }
         } finally {
-            tablePool.putTable(table);
+            table.close();
         }
     }
 
-
     /**
      * Creates an HBase {@link Put} on a specific row, using a collection of values (family/column/value pairs).
      *
@@ -179,7 +178,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
         return resultRow;
     }
 
-
     /**
      * Creates an HBase {@link Delete} on a specific row, using a collection of values (family/column/value pairs).
      *
@@ -192,7 +190,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
         return new Delete(endpoint.getCamelContext().getTypeConverter().convertTo(byte[].class, hRow.getId()));
     }
 
-
     /**
      * Perfoms an HBase {@link Get} on a specific row, using a collection of values (family/column/value pairs).
      * The result is <p>the most recent entry</p> for each column.
@@ -247,7 +244,6 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
         return rowSet;
     }
 
-
     /**
      * This methods fill possible gaps in the {@link Exchange} headers, with values passed from the Endpoint.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
index 4e385b0..4459bba 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepository.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HBaseIdempotentRepository extends ServiceSupport implements IdempotentRepository<Object> {
+
     private static final Logger LOG = LoggerFactory.getLogger(HBaseIdempotentRepository.class);
 
     private final String tableName;

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
index 3b9517e..eb3c050 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseFilterTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.hbase;
 
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -26,37 +27,13 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.hbase.filters.ModelAwareColumnMatchingFilter;
 import org.apache.camel.impl.JndiRegistry;
-import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
 
     List<Filter> filters = new LinkedList<Filter>();
 
-    @Before
-    public void setUp() throws Exception {
-        if (systemReady) {
-            try {
-                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families);
-            } catch (TableExistsException ex) {
-                //Ignore if table exists
-            }
-
-            super.setUp();
-        }
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (systemReady) {
-            hbaseUtil.deleteTable(PERSON_TABLE.getBytes());
-            super.tearDown();
-        }
-    }
-
     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry jndi = super.createRegistry();
@@ -92,11 +69,11 @@ public class CamelHBaseFilterTest extends CamelHBaseTestSupport {
             @Override
             public void configure() {
                 from("direct:start")
-                        .to("hbase://" + PERSON_TABLE);
+                    .to("hbase://" + PERSON_TABLE);
+
                 from("direct:scan")
-                        .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&filters=#myFilters");
+                    .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&filters=#myFilters");
             }
         };
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
index e246264..26ff675 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/CamelHBaseTestSupport.java
@@ -24,10 +24,15 @@ import org.apache.camel.test.junit4.CamelTestSupport;
 import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class CamelHBaseTestSupport extends CamelTestSupport {
 
@@ -40,6 +45,8 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport {
     protected static final String PERSON_TABLE = "person";
     protected static final String INFO_FAMILY = "info";
 
+    private static final Logger LOG = LoggerFactory.getLogger(CamelHBaseTestSupport.class);
+
     protected String[] key = {"1", "2", "3"};
     protected final String[] family = {"info", "birthdate", "address"};
     //comlumn[family][column]
@@ -66,6 +73,7 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport {
         try {
             hbaseUtil.startMiniCluster(numServers);
         } catch (Exception e) {
+            LOG.error("couldn't start HBase cluster.", e);
             systemReady = false;
         }
     }
@@ -77,6 +85,27 @@ public abstract class CamelHBaseTestSupport extends CamelTestSupport {
         }
     }
 
+    @Before
+    public void setUp() throws Exception {
+        if (systemReady) {
+            try {
+                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families);
+            } catch (TableExistsException ex) {
+                //Ignore if table exists
+            }
+
+            super.setUp();
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (systemReady) {
+            hbaseUtil.deleteTable(PERSON_TABLE.getBytes());
+            super.tearDown();
+        }
+    }
+
     @Override
     public CamelContext createCamelContext() throws Exception {
         CamelContext context = new DefaultCamelContext(createRegistry());

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
index d8149c2..deda182 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConsumerTest.java
@@ -18,43 +18,19 @@ package org.apache.camel.component.hbase;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.camel.ProducerTemplate;
+
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class HBaseConsumerTest extends CamelHBaseTestSupport {
 
-    @Before
-    public void setUp() throws Exception {
-        if (systemReady) {
-            try {
-                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families);
-            } catch (TableExistsException ex) {
-                //Ignore if table exists
-            }
-
-            super.setUp();
-        }
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (systemReady) {
-            super.tearDown();
-        }
-    }
-
     @Test
     public void testPutMultiRowsAndConsume() throws Exception {
         if (systemReady) {
             MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
             mockEndpoint.expectedMessageCount(3);
 
-            ProducerTemplate template = context.createProducerTemplate();
             Map<String, Object> headers = new HashMap<String, Object>();
 
             for (int row = 0; row < key.length; row++) {
@@ -81,10 +57,10 @@ public class HBaseConsumerTest extends CamelHBaseTestSupport {
             @Override
             public void configure() {
                 from("direct:start")
-                        .to("hbase://" + PERSON_TABLE);
+                    .to("hbase://" + PERSON_TABLE);
 
                 from("hbase://" + PERSON_TABLE)
-                        .to("mock:result");
+                    .to("mock:result");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
index ab75381..1313936 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseConvertionsTest.java
@@ -18,17 +18,15 @@ package org.apache.camel.component.hbase;
 
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class HBaseConvertionsTest extends CamelHBaseTestSupport {
@@ -38,27 +36,6 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport {
     protected final String[] column = {"DEFAULTCOLUMN"};
     protected final byte[][] families = {INFO_FAMILY.getBytes()};
 
-    @Before
-    public void setUp() throws Exception {
-        if (systemReady) {
-            try {
-                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families);
-            } catch (TableExistsException ex) {
-                //Ignore if table exists
-            }
-
-            super.setUp();
-        }
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (systemReady) {
-            hbaseUtil.deleteTable(PERSON_TABLE.getBytes());
-            super.tearDown();
-        }
-    }
-
     @Test
     public void testPutMultiRows() throws Exception {
         if (systemReady) {
@@ -111,7 +88,6 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport {
         }
     }
 
-
     /**
      * Factory method which derived classes can use to create a {@link org.apache.camel.builder.RouteBuilder}
      * to define the routes for testing
@@ -122,10 +98,10 @@ public class HBaseConvertionsTest extends CamelHBaseTestSupport {
             @Override
             public void configure() {
                 from("direct:start")
-                        .to("hbase://" + PERSON_TABLE);
+                    .to("hbase://" + PERSON_TABLE);
 
                 from("direct:scan")
-                        .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&family=family1&qualifier=column1");
+                    .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2&family=family1&qualifier=column1");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
index 6a259b1..c66b993 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/HBaseProducerTest.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -27,41 +28,16 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 public class HBaseProducerTest extends CamelHBaseTestSupport {
 
-    @Before
-    public void setUp() throws Exception {
-        if (systemReady) {
-            try {
-                hbaseUtil.createTable(HBaseHelper.getHBaseFieldAsBytes(PERSON_TABLE), families);
-            } catch (TableExistsException ex) {
-                //Ignore if table exists
-            }
-
-            super.setUp();
-        }
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        if (systemReady) {
-            hbaseUtil.deleteTable(PERSON_TABLE.getBytes());
-            super.tearDown();
-        }
-    }
-
     @Test
     public void testPut() throws Exception {
         if (systemReady) {
-            ProducerTemplate template = context.createProducerTemplate();
             Map<String, Object> headers = new HashMap<String, Object>();
             headers.put(HbaseAttribute.HBASE_ROW_ID.asHeader(), key[0]);
             headers.put(HbaseAttribute.HBASE_FAMILY.asHeader(), family[0]);
@@ -83,7 +59,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
         }
     }
 
-
     @Test
     public void testPutAndGet() throws Exception {
         testPut();
@@ -132,7 +107,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
         }
     }
 
-
     @Test
     public void testPutMultiRows() throws Exception {
         if (systemReady) {
@@ -185,7 +159,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
         }
     }
 
-
     @Test
     public void testPutMultiColumns() throws Exception {
         if (systemReady) {
@@ -217,7 +190,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
         }
     }
 
-
     @Test
     public void testPutAndGetMultiColumns() throws Exception {
         testPutMultiColumns();
@@ -239,7 +211,6 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
         }
     }
 
-
     @Test
     public void testPutAndGetAndDeleteMultiRows() throws Exception {
         testPutMultiRows();
@@ -297,13 +268,13 @@ public class HBaseProducerTest extends CamelHBaseTestSupport {
             @Override
             public void configure() {
                 from("direct:start")
-                        .to("hbase://" + PERSON_TABLE);
+                    .to("hbase://" + PERSON_TABLE);
 
                 from("direct:start-with-model")
-                        .to("hbase://" + PERSON_TABLE + "?family=info&qualifier=firstName&family2=birthdate&qualifier2=year");
+                    .to("hbase://" + PERSON_TABLE + "?family=info&qualifier=firstName&family2=birthdate&qualifier2=year");
 
                 from("direct:scan")
-                        .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
+                    .to("hbase://" + PERSON_TABLE + "?operation=" + HBaseConstants.SCAN + "&maxResults=2");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/fa9b6e5e/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
index 20e3766..352c59c 100644
--- a/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
+++ b/components/camel-hbase/src/test/java/org/apache/camel/component/hbase/processor/idempotent/HBaseIdempotentRepositoryTest.java
@@ -69,6 +69,9 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
             // try to add an other one
             assertTrue(repository.add(key02));
             assertTrue(repository.contains(key02));
+
+            // try to add the first key again
+            assertFalse(repository.add(key01));
         }
     }
 
@@ -101,6 +104,14 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
     }
 
     @Test
+    public void testConfirm() throws Exception {
+        if (systemReady) {
+            // it always return true
+            assertTrue(repository.confirm(key01));
+        }
+    }
+
+    @Test
     public void testRepositoryInRoute() throws Exception {
         if (systemReady) {
             MockEndpoint mock = (MockEndpoint) context.getEndpoint("mock:out");
@@ -127,10 +138,9 @@ public class HBaseIdempotentRepositoryTest extends CamelHBaseTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:in")
-                        .idempotentConsumer(header("messageId"), repository)
-                        .to("mock:out");
+                    .idempotentConsumer(header("messageId"), repository)
+                    .to("mock:out");
             }
         };
     }
-
 }