You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/06/23 11:24:19 UTC
camel git commit: CAMEL-11439 - Camel-Caffeine: Create an Aggregation
Repository using Caffeine
Repository: camel
Updated Branches:
refs/heads/master 73410a0c7 -> d5e35c95b
CAMEL-11439 - Camel-Caffeine: Create an Aggregation Repository using Caffeine
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d5e35c95
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d5e35c95
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d5e35c95
Branch: refs/heads/master
Commit: d5e35c95b6d751ed4bf7c97c5e5737f49959864a
Parents: 73410a0
Author: Andrea Cosentino <an...@gmail.com>
Authored: Fri Jun 23 13:22:38 2017 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Jun 23 13:22:38 2017 +0200
----------------------------------------------------------------------
.../CaffeineAggregationRepository.java | 205 +++++++++++++++++++
...feineAggregationRepositoryOperationTest.java | 198 ++++++++++++++++++
...CaffeineAggregationRepositoryRoutesTest.java | 103 ++++++++++
3 files changed, 506 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java
new file mode 100644
index 0000000..702a8e6
--- /dev/null
+++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepository.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.aggregate;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CaffeineAggregationRepository extends ServiceSupport implements RecoverableAggregationRepository {
+ private static final Logger LOG = LoggerFactory.getLogger(CaffeineAggregationRepository.class);
+
+ private CamelContext camelContext;
+ private Cache<String, DefaultExchangeHolder> cache;
+ private boolean allowSerializedHeaders;
+
+ private boolean useRecovery = true;
+ private String deadLetterChannel;
+ private long recoveryInterval = 5000;
+ private int maximumRedeliveries = 3;
+
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public Cache<String, DefaultExchangeHolder> getCache() {
+ return cache;
+ }
+
+ public void setCache(Cache<String, DefaultExchangeHolder> cache) {
+ this.cache = cache;
+ }
+
+ public boolean isAllowSerializedHeaders() {
+ return allowSerializedHeaders;
+ }
+
+ public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
+ this.allowSerializedHeaders = allowSerializedHeaders;
+ }
+
+ @Override
+ public void setDeadLetterUri(String deadLetterUri) {
+ this.deadLetterChannel = deadLetterUri;
+ }
+
+ @Override
+ public String getDeadLetterUri() {
+ return deadLetterChannel;
+ }
+
+ @Override
+ public boolean isUseRecovery() {
+ return useRecovery;
+ }
+
+ @Override
+ public void setUseRecovery(boolean useRecovery) {
+ this.useRecovery = useRecovery;
+ }
+
+ public String getDeadLetterChannel() {
+ return deadLetterChannel;
+ }
+
+ public void setDeadLetterChannel(String deadLetterChannel) {
+ this.deadLetterChannel = deadLetterChannel;
+ }
+
+ public long getRecoveryInterval() {
+ return recoveryInterval;
+ }
+
+ @Override
+ public long getRecoveryIntervalInMillis() {
+ return recoveryInterval;
+ }
+
+ @Override
+ public void setRecoveryInterval(long recoveryInterval) {
+ this.recoveryInterval = recoveryInterval;
+ }
+
+ @Override
+ public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+ this.recoveryInterval = timeUnit.toMillis(interval);
+ }
+
+ @Override
+ public int getMaximumRedeliveries() {
+ return maximumRedeliveries;
+ }
+
+ @Override
+ public void setMaximumRedeliveries(int maximumRedeliveries) {
+ this.maximumRedeliveries = maximumRedeliveries;
+ }
+
+ @Override
+ public Exchange add(final CamelContext camelContext, final String key, final Exchange exchange) {
+ LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key);
+
+ final DefaultExchangeHolder oldHolder = cache.getIfPresent(key);
+ final DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders);
+
+ cache.put(key, newHolder);
+
+ return unmarshallExchange(camelContext, oldHolder);
+ }
+
+ @Override
+ public Exchange get(CamelContext camelContext, String key) {
+ return unmarshallExchange(camelContext, cache.getIfPresent(key));
+ }
+
+ @Override
+ public void remove(CamelContext camelContext, String key, Exchange exchange) {
+ LOG.trace("Removing an exchange with ID {} for key {} ", exchange.getExchangeId(), key);
+ cache.invalidate(key);
+ }
+
+ @Override
+ public void confirm(CamelContext camelContext, String exchangeId) {
+ LOG.trace("Confirming an exchange with ID {}.", exchangeId);
+ cache.invalidate(exchangeId);
+ }
+
+ @Override
+ public Set<String> getKeys() {
+ Set<String> keys = cache.asMap().keySet();
+
+ return Collections.unmodifiableSet(keys);
+ }
+
+ @Override
+ public Set<String> scan(CamelContext camelContext) {
+ LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName());
+ Set<String> scanned = Collections.unmodifiableSet(getKeys());
+ LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), camelContext.getName());
+ return scanned;
+ }
+
+ @Override
+ public Exchange recover(CamelContext camelContext, String exchangeId) {
+ LOG.trace("Recovering an Exchange with ID {}.", exchangeId);
+ return useRecovery ? unmarshallExchange(camelContext, cache.getIfPresent(exchangeId)) : null;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (maximumRedeliveries < 0) {
+ throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer.");
+ }
+ if (recoveryInterval < 0) {
+ throw new IllegalArgumentException("Recovery interval must be zero or a positive integer.");
+ }
+
+ if (cache == null) {
+ Caffeine<Object, Object> builder = Caffeine.newBuilder();
+ cache = builder.build();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ }
+
+ public static Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
+ Exchange exchange = null;
+ if (holder != null) {
+ exchange = new DefaultExchange(camelContext);
+ DefaultExchangeHolder.unmarshal(exchange, holder);
+ }
+
+ return exchange;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java
new file mode 100644
index 0000000..754f7f3
--- /dev/null
+++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryOperationTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.aggregate;
+
+import java.util.Set;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class CaffeineAggregationRepositoryOperationTest extends CamelTestSupport {
+ private CaffeineAggregationRepository aggregationRepository;
+
+ @Override
+ protected void doPreSetup() throws Exception {
+ super.doPreSetup();
+
+ aggregationRepository = new CaffeineAggregationRepository();
+ aggregationRepository.start();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ aggregationRepository.stop();
+ super.tearDown();
+ }
+
+ private boolean exists(String key) {
+ DefaultExchangeHolder holder = aggregationRepository.getCache().getIfPresent(key);
+ if (holder == null) {
+ return false;
+ }
+ return true;
+ }
+
+ @Test
+ public void testAdd() {
+ // Given
+ String key = "Add";
+ assertFalse(exists(key));
+ Exchange exchange = new DefaultExchange(context());
+ // When
+ aggregationRepository.add(context(), key, exchange);
+ // Then
+ assertTrue(exists(key));
+ }
+
+ @Test
+ public void testGetExists() {
+ // Given
+ String key = "Get_Exists";
+ Exchange exchange = new DefaultExchange(context());
+ aggregationRepository.add(context(), key, exchange);
+ assertTrue(exists(key));
+
+ // When
+ Exchange exchange2 = aggregationRepository.get(context(), key);
+ // Then
+ assertNotNull(exchange2);
+ assertEquals(exchange.getExchangeId(), exchange2.getExchangeId());
+ }
+
+ @Test
+ public void testGetNotExists() {
+ // Given
+ String key = "Get_NotExists";
+ assertFalse(exists(key));
+ // When
+ Exchange exchange2 = aggregationRepository.get(context(), key);
+ // Then
+ assertNull(exchange2);
+ }
+
+ @Test
+ public void testRemoveExists() {
+ // Given
+ String key = "Remove_Exists";
+ Exchange exchange = new DefaultExchange(context());
+ aggregationRepository.add(context(), key, exchange);
+ assertTrue(exists(key));
+ // When
+ aggregationRepository.remove(context(), key, exchange);
+ // Then
+ assertFalse(exists(key));
+ }
+
+ @Test
+ public void testRemoveNotExists() {
+ // Given
+ String key = "RemoveNotExists";
+ Exchange exchange = new DefaultExchange(context());
+ assertFalse(exists(key));
+ // When
+ aggregationRepository.remove(context(), key, exchange);
+ // Then
+ assertFalse(exists(key));
+ }
+
+ @Test
+ public void testGetKeys() {
+ // Given
+ String[] keys = {"GetKeys1", "GetKeys2"};
+ addExchanges(keys);
+ // When
+ Set<String> keySet = aggregationRepository.getKeys();
+ // Then
+ for (String key : keys) {
+ assertTrue(keySet.contains(key));
+ }
+ }
+
+ @Test
+ public void testConfirmExist() {
+ // Given
+ for (int i = 1; i < 4; i++) {
+ String key = "Confirm_" + i;
+ Exchange exchange = new DefaultExchange(context());
+ exchange.setExchangeId("Exchange_" + i);
+ aggregationRepository.add(context(), key, exchange);
+ assertTrue(exists(key));
+ }
+ // When
+ aggregationRepository.confirm(context(), "Confirm_2");
+ // Then
+ assertTrue(exists("Confirm_1"));
+ assertFalse(exists("Confirm_2"));
+ assertTrue(exists("Confirm_3"));
+ }
+
+ @Test
+ public void testConfirmNotExist() {
+ // Given
+ String[] keys = new String[3];
+ for (int i = 1; i < 4; i++) {
+ keys[i - 1] = "Confirm" + i;
+ }
+ addExchanges(keys);
+ for (String key : keys) {
+ assertTrue(exists(key));
+ }
+ // When
+ aggregationRepository.confirm(context(), "Exchange-Confirm5");
+ // Then
+ for (String key : keys) {
+ assertTrue(exists(key));
+ }
+ }
+
+ private void addExchanges(String... keys) {
+ for (String key : keys) {
+ Exchange exchange = new DefaultExchange(context());
+ exchange.setExchangeId("Exchange-" + key);
+ aggregationRepository.add(context(), key, exchange);
+ }
+ }
+
+ @Test
+ public void testScan() {
+ // Given
+ String[] keys = {"Scan1", "Scan2"};
+ addExchanges(keys);
+ // When
+ Set<String> exchangeIdSet = aggregationRepository.scan(context());
+ // Then
+ for (String key : keys) {
+ assertTrue(exchangeIdSet.contains(key));
+ }
+ }
+
+ @Test
+ public void testRecover() {
+ // Given
+ String[] keys = {"Recover1", "Recover2"};
+ addExchanges(keys);
+ // When
+ Exchange exchange2 = aggregationRepository.recover(context(), "Recover2");
+ Exchange exchange3 = aggregationRepository.recover(context(), "Recover3");
+ // Then
+ assertNotNull(exchange2);
+ assertNull(exchange3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d5e35c95/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java
new file mode 100644
index 0000000..2c60215
--- /dev/null
+++ b/components/camel-caffeine/src/test/java/org/apache/camel/component/caffeine/processor/aggregate/CaffeineAggregationRepositoryRoutesTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.caffeine.processor.aggregate;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class CaffeineAggregationRepositoryRoutesTest extends CamelTestSupport {
+ private static final String ENDPOINT_MOCK = "mock:result";
+ private static final String ENDPOINT_DIRECT = "direct:one";
+ private static final int[] VALUES = generateRandomArrayOfInt(10, 0, 30);
+ private static final int SUM = IntStream.of(VALUES).reduce(0, (a, b) -> a + b);
+ private static final String CORRELATOR = "CORRELATOR";
+
+ @EndpointInject(uri = ENDPOINT_MOCK)
+ private MockEndpoint mock;
+
+ @Produce(uri = ENDPOINT_DIRECT)
+ private ProducerTemplate producer;
+
+ @Test
+ public void checkAggregationFromOneRoute() throws Exception {
+ mock.expectedMessageCount(VALUES.length);
+ mock.expectedBodiesReceived(SUM);
+
+ IntStream.of(VALUES).forEach(
+ i -> producer.sendBodyAndHeader(i, CORRELATOR, CORRELATOR)
+ );
+
+ mock.assertIsSatisfied();
+ }
+
+ private Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ } else {
+ Integer n = newExchange.getIn().getBody(Integer.class);
+ Integer o = oldExchange.getIn().getBody(Integer.class);
+ Integer v = (o == null ? 0 : o) + (n == null ? 0 : n);
+
+ oldExchange.getIn().setBody(v, Integer.class);
+
+ return oldExchange;
+ }
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(ENDPOINT_DIRECT)
+ .routeId("AggregatingRouteOne")
+ .aggregate(header(CORRELATOR))
+ .aggregationRepository(createAggregateRepository())
+ .aggregationStrategy(CaffeineAggregationRepositoryRoutesTest.this::aggregate)
+ .completionSize(VALUES.length)
+ .to("log:org.apache.camel.component.caffeine.processor.aggregate?level=INFO&showAll=true&multiline=true")
+ .to(ENDPOINT_MOCK);
+ }
+ };
+ }
+
+ protected static int[] generateRandomArrayOfInt(int size, int lower, int upper) {
+ Random random = new Random();
+ int[] array = new int[size];
+
+ Arrays.setAll(array, i -> random.nextInt(upper - lower) + lower);
+
+ return array;
+ }
+
+ protected CaffeineAggregationRepository createAggregateRepository() throws Exception {
+ CaffeineAggregationRepository repository = new CaffeineAggregationRepository();
+
+ return repository;
+ }
+}