You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/06/23 11:24:19 UTC

camel git commit: CAMEL-11439 - Camel-Caffeine: Create an Aggregation Repository using Caffeine

Repository: camel
Updated Branches:
  refs/heads/master 73410a0c7 -> d5e35c95b


CAMEL-11439 - Camel-Caffeine: Create an Aggregation Repository using Caffeine


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d5e35c95
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d5e35c95
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d5e35c95

Branch: refs/heads/master
Commit: d5e35c95b6d751ed4bf7c97c5e5737f49959864a
Parents: 73410a0
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri Jun 23 13:22:38 2017 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Jun 23 13:22:38 2017 +0200

----------------------------------------------------------------------
 .../CaffeineAggregationRepository.java          | 205 +++++++++++++++++++
 ...feineAggregationRepositoryOperationTest.java | 198 ++++++++++++++++++
 ...CaffeineAggregationRepositoryRoutesTest.java | 103 ++++++++++
 3 files changed, 506 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java
new file mode 100644
index 0000000..702a8e6
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.aggregate;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CaffeineAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository {
+    private static final Logger LOG = LoggerFactory.getLogger(CaffeineAggregationRepository.class);
+
+    private CamelContext camelContext;
+    private Cache<String, DefaultExchangeHolder> cache;
+    private boolean allowSerializedHeaders;
+
+    private boolean useRecovery = true;
+    private String deadLetterChannel;
+    private long recoveryInterval = 5000;
+    private int maximumRedeliveries = 3;
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public Cache<String, DefaultExchangeHolder> getCache() {
+        return cache;
+    }
+
+    public void setCache(Cache<String, DefaultExchangeHolder> cache) {
+        this.cache = cache;
+    }
+
+    public boolean isAllowSerializedHeaders() {
+        return allowSerializedHeaders;
+    }
+
+    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
+        this.allowSerializedHeaders = allowSerializedHeaders;
+    }
+
+    @Override
+    public void setDeadLetterUri(String deadLetterUri) {
+        this.deadLetterChannel = deadLetterUri;
+    }
+
+    @Override
+    public String getDeadLetterUri() {
+        return deadLetterChannel;
+    }
+
+    @Override
+    public boolean isUseRecovery() {
+        return useRecovery;
+    }
+
+    @Override
+    public void setUseRecovery(boolean useRecovery) {
+        this.useRecovery = useRecovery;
+    }
+
+    public String getDeadLetterChannel() {
+        return deadLetterChannel;
+    }
+
+    public void setDeadLetterChannel(String deadLetterChannel) {
+        this.deadLetterChannel = deadLetterChannel;
+    }
+
+    public long getRecoveryInterval() {
+        return recoveryInterval;
+    }
+
+    @Override
+    public long getRecoveryIntervalInMillis() {
+        return recoveryInterval;
+    }
+
+    @Override
+    public void setRecoveryInterval(long recoveryInterval) {
+        this.recoveryInterval = recoveryInterval;
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+        this.recoveryInterval = timeUnit.toMillis(interval);
+    }
+
+    @Override
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+    @Override
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
+    @Override
+    public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) {
+        LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+
+        final DefaultExchangeHolder oldHolder = cache.getIfPresent(key);
+        final DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
+
+        cache.put(key, newHolder);
+
+        return unmarshallExchange(camelContext, oldHolder);
+    }
+
+    @Override
+    public Exchange get(CamelContext camelContext, String key) {
+        return unmarshallExchange(camelContext, cache.getIfPresent(key));
+    }
+
+    @Override
+    public void remove(CamelContext camelContext, String key, Exchange exchange) {
+        LOG.trace("Removing an exchange with ID {} for key {} ", exchange.getExchangeId(), key);
+        cache.invalidate(key);
+    }
+
+    @Override
+    public void confirm(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Confirming an exchange with ID {}.", exchangeId);
+        cache.invalidate(exchangeId);
+    }
+
+    @Override
+    public Set<String> getKeys() {
+        Set<String> keys = cache.asMap().keySet();
+
+        return Collections.unmodifiableSet(keys);
+    }
+
+    @Override
+    public Set<String> scan(CamelContext camelContext) {
+        LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName());
+        Set<String> scanned = Collections.unmodifiableSet(getKeys());
+        LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName());
+        return scanned;
+    }
+
+    @Override
+    public Exchange recover(CamelContext camelContext, String exchangeId) {
+        LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
+        return useRecovery ? unmarshallExchange(camelContext, cache.getIfPresent(exchangeId)) : null;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (maximumRedeliveries < 0) {
+            throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
+        }
+        if (recoveryInterval < 0) {
+            throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
+        }
+
+        if (cache == null) {
+            Caffeine<Object, Object> builder = Caffeine.newBuilder();
+            cache = builder.build();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+    }
+
+    public static Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
+        Exchange exchange = null;
+        if (holder != null) {
+            exchange = new DefaultExchange(camelContext);
+            DefaultExchangeHolder.unmarshal(exchange, holder);
+        }
+
+        return exchange;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java
new file mode 100644
index 0000000..754f7f3
--- /dev/null
+++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.aggregate;
+
+import java.util.Set;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class CaffeineAggregationRepositoryOperationTest extends CamelTestSupport {
+    private CaffeineAggregationRepository aggregationRepository;
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        super.doPreSetup();
+
+        aggregationRepository = new CaffeineAggregationRepository();
+        aggregationRepository.start();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        aggregationRepository.stop();
+        super.tearDown();
+    }
+
+    private boolean exists(String key) {
+        DefaultExchangeHolder holder = aggregationRepository.getCache().getIfPresent(key);
+        if (holder == null) {
+            return false;
+        }
+        return true;
+    }
+
+    @Test
+    public void testAdd() {
+        // Given
+        String key = "Add";
+        assertFalse(exists(key));
+        Exchange exchange = new DefaultExchange(context());
+        // When
+        aggregationRepository.add(context(), key, exchange);
+        // Then
+        assertTrue(exists(key));
+    }
+
+    @Test
+    public void testGetExists() {
+        // Given
+        String key = "Get_Exists";
+        Exchange exchange = new DefaultExchange(context());
+        aggregationRepository.add(context(), key, exchange);
+        assertTrue(exists(key));
+
+        // When
+        Exchange exchange2 = aggregationRepository.get(context(), key);
+        // Then
+        assertNotNull(exchange2);
+        assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+    }
+
+    @Test
+    public void testGetNotExists() {
+        // Given
+        String key = "Get_NotExists";
+        assertFalse(exists(key));
+        // When
+        Exchange exchange2 = aggregationRepository.get(context(), key);
+        // Then
+        assertNull(exchange2);
+    }
+
+    @Test
+    public void testRemoveExists() {
+        // Given
+        String key = "Remove_Exists";
+        Exchange exchange = new DefaultExchange(context());
+        aggregationRepository.add(context(), key, exchange);
+        assertTrue(exists(key));
+        // When
+        aggregationRepository.remove(context(), key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testRemoveNotExists() {
+        // Given
+        String key = "RemoveNotExists";
+        Exchange exchange = new DefaultExchange(context());
+        assertFalse(exists(key));
+        // When
+        aggregationRepository.remove(context(), key, exchange);
+        // Then
+        assertFalse(exists(key));
+    }
+
+    @Test
+    public void testGetKeys() {
+        // Given
+        String[] keys = {"GetKeys1", "GetKeys2"};
+        addExchanges(keys);
+        // When
+        Set<String> keySet = aggregationRepository.getKeys();
+        // Then
+        for (String key : keys) {
+            assertTrue(keySet.contains(key));
+        }
+    }
+
+    @Test
+    public void testConfirmExist() {
+        // Given
+        for (int i = 1; i < 4; i++) {
+            String key = "Confirm_" + i;
+            Exchange exchange = new DefaultExchange(context());
+            exchange.setExchangeId("Exchange_" + i);
+            aggregationRepository.add(context(), key, exchange);
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(context(), "Confirm_2");
+        // Then
+        assertTrue(exists("Confirm_1"));
+        assertFalse(exists("Confirm_2"));
+        assertTrue(exists("Confirm_3"));
+    }
+
+    @Test
+    public void testConfirmNotExist() {
+        // Given
+        String[] keys = new String[3];
+        for (int i = 1; i < 4; i++) {
+            keys[i - 1] = "Confirm" + i;
+        }
+        addExchanges(keys);
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+        // When
+        aggregationRepository.confirm(context(), "Exchange-Confirm5");
+        // Then
+        for (String key : keys) {
+            assertTrue(exists(key));
+        }
+    }
+
+    private void addExchanges(String... keys) {
+        for (String key : keys) {
+            Exchange exchange = new DefaultExchange(context());
+            exchange.setExchangeId("Exchange-" + key);
+            aggregationRepository.add(context(), key, exchange);
+        }
+    }
+
+    @Test
+    public void testScan() {
+        // Given
+        String[] keys = {"Scan1", "Scan2"};
+        addExchanges(keys);
+        // When
+        Set<String> exchangeIdSet = aggregationRepository.scan(context());
+        // Then
+        for (String key : keys) {
+            assertTrue(exchangeIdSet.contains(key));
+        }
+    }
+
+    @Test
+    public void testRecover() {
+        // Given
+        String[] keys = {"Recover1", "Recover2"};
+        addExchanges(keys);
+        // When
+        Exchange exchange2 = aggregationRepository.recover(context(), "Recover2");
+        Exchange exchange3 = aggregationRepository.recover(context(), "Recover3");
+        // Then
+        assertNotNull(exchange2);
+        assertNull(exchange3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java
new file mode 100644
index 0000000..2c60215
--- /dev/null
+++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.aggregate;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class CaffeineAggregationRepositoryRoutesTest extends CamelTestSupport {
+    private static final String ENDPOINT_MOCK = "mock:result";
+    private static final String ENDPOINT_DIRECT = "direct:one";
+    private static final int[] VALUES = generateRandomArrayOfInt(10, 0, 30);
+    private static final int SUM = IntStream.of(VALUES).reduce(0, (a, b) -> a + b);
+    private static final String CORRELATOR = "CORRELATOR";
+
+    @EndpointInject(uri = ENDPOINT_MOCK)
+    private MockEndpoint mock;
+
+    @Produce(uri = ENDPOINT_DIRECT)
+    private ProducerTemplate producer;
+
+    @Test
+    public void checkAggregationFromOneRoute() throws Exception {
+        mock.expectedMessageCount(VALUES.length);
+        mock.expectedBodiesReceived(SUM);
+
+        IntStream.of(VALUES).forEach(
+            i -> producer.sendBodyAndHeader(i, CORRELATOR, CORRELATOR)
+        );
+
+        mock.assertIsSatisfied();
+    }
+
+    private Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            return newExchange;
+        } else {
+            Integer n = newExchange.getIn().getBody(Integer.class);
+            Integer o = oldExchange.getIn().getBody(Integer.class);
+            Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
+
+            oldExchange.getIn().setBody(v, Integer.class);
+
+            return oldExchange;
+        }
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(ENDPOINT_DIRECT)
+                    .routeId("AggregatingRouteOne")
+                    .aggregate(header(CORRELATOR))
+                    .aggregationRepository(createAggregateRepository())
+                    .aggregationStrategy(CaffeineAggregationRepositoryRoutesTest.this::aggregate)
+                    .completionSize(VALUES.length)
+                        .to("log:org.apache.camel.component.caffeine.processor.aggregate?level=INFO&showAll=true&multiline=true")
+                        .to(ENDPOINT_MOCK);
+            }
+        };
+    }
+    
+    protected static int[] generateRandomArrayOfInt(int size, int lower, int upper) {
+        Random random = new Random();
+        int[] array = new int[size];
+
+        Arrays.setAll(array, i -> random.nextInt(upper - lower) + lower);
+
+        return array;
+    }
+    
+    protected CaffeineAggregationRepository createAggregateRepository() throws Exception {
+        CaffeineAggregationRepository repository = new CaffeineAggregationRepository();
+
+        return repository;
+    }
+}