You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by an...@apache.org on 2016/07/07 20:43:55 UTC
camel git commit: CAMEL-10116: Retrieve last MessageHistory when
getting NodeId and RouteId
Repository: camel
Updated Branches:
refs/heads/master 6483909c2 -> 946c8dffb
CAMEL-10116: Retrieve last MessageHistory when getting NodeId and RouteId
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/946c8dff
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/946c8dff
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/946c8dff
Branch: refs/heads/master
Commit: 946c8dffb69f5bc95392d3e27aeaf5212cd46c87
Parents: 6483909
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Tue Jul 5 20:24:35 2016 +0200
Committer: Arno Noordover <an...@users.noreply.github.com>
Committed: Thu Jul 7 22:42:00 2016 +0200
----------------------------------------------------------------------
.../impl/DefaultAsyncProcessorAwaitManager.java | 46 ++++--
.../org/apache/camel/impl/DefaultExchange.java | 3 +-
.../camel/impl/DefaultInflightRepository.java | 13 +-
.../camel/processor/CamelInternalProcessor.java | 3 +-
.../org/apache/camel/util/ExchangeHelper.java | 4 +-
.../DefaultAsyncProcessorAwaitManagerTest.java | 154 +++++++++++++++++++
.../async/AsyncProcessorAwaitManagerTest.java | 3 +-
7 files changed, 201 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index 2712178..d0d306b 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -18,7 +18,7 @@ package org.apache.camel.impl;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
+import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.Exchange;
import org.apache.camel.MessageHistory;
+import org.apache.camel.NamedNode;
import org.apache.camel.processor.DefaultExchangeFormatter;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ExchangeFormatter;
@@ -240,23 +241,12 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
private final Exchange exchange;
private final CountDownLatch latch;
private final long start;
- private String routeId;
- private String nodeId;
private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
this.thread = thread;
this.exchange = exchange;
this.latch = latch;
this.start = System.currentTimeMillis();
-
- // capture details from message history if enabled
- List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
- if (list != null && !list.isEmpty()) {
- // grab last part
- MessageHistory history = list.get(list.size() - 1);
- routeId = history.getRouteId();
- nodeId = history.getNode() != null ? history.getNode().getId() : null;
- }
}
@Override
@@ -276,18 +266,46 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
@Override
public String getRouteId() {
- return routeId;
+ MessageHistory lastMessageHistory = getLastMessageHistory();
+ if (lastMessageHistory == null) {
+ return null;
+ }
+ return lastMessageHistory.getRouteId();
}
@Override
public String getNodeId() {
- return nodeId;
+ NamedNode node = getNode();
+ if (node == null) {
+ return null;
+ }
+ return node.getId();
}
public CountDownLatch getLatch() {
return latch;
}
+ private NamedNode getNode() {
+ MessageHistory lastMessageHistory = getLastMessageHistory();
+ if (lastMessageHistory == null) {
+ return null;
+ }
+ return lastMessageHistory.getNode();
+ }
+
+ private MessageHistory getLastMessageHistory() {
+ LinkedList<MessageHistory> list = getMessageHistories();
+ if (list == null || list.isEmpty()) {
+ return null;
+ }
+ return list.getLast();
+ }
+
+ private LinkedList<MessageHistory> getMessageHistories() {
+ return exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
+ }
+
@Override
public String toString() {
return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]";
http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
index 0091b6e..87b4ff0 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
@@ -17,6 +17,7 @@
package org.apache.camel.impl;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -150,7 +151,7 @@ public final class DefaultExchange implements Exchange {
// safe copy message history using a defensive copy
List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
if (history != null) {
- answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history));
+ answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
}
return answer;
http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index 6b65c24..4d0085a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -194,13 +195,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@Override
@SuppressWarnings("unchecked")
public long getElapsed() {
- List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+ LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
if (list == null || list.isEmpty()) {
return 0;
}
// get latest entry
- MessageHistory history = list.get(list.size() - 1);
+ MessageHistory history = list.getLast();
if (history != null) {
return history.getElapsed();
} else {
@@ -211,13 +212,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@Override
@SuppressWarnings("unchecked")
public String getNodeId() {
- List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+ LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
if (list == null || list.isEmpty()) {
return null;
}
// get latest entry
- MessageHistory history = list.get(list.size() - 1);
+ MessageHistory history = list.getLast();
if (history != null) {
return history.getNode().getId();
} else {
@@ -238,13 +239,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@Override
@SuppressWarnings("unchecked")
public String getAtRouteId() {
- List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+ LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
if (list == null || list.isEmpty()) {
return null;
}
// get latest entry
- MessageHistory history = list.get(list.size() - 1);
+ MessageHistory history = list.getLast();
if (history != null) {
return history.getRouteId();
} else {
http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 3c15167..a726b2f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
@@ -744,7 +745,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
public MessageHistory before(Exchange exchange) throws Exception {
List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
if (list == null) {
- list = new ArrayList<MessageHistory>();
+ list = new LinkedList<>();
exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
}
MessageHistory history = factory.newMessageHistory(routeId, definition, new Date());
http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 9641026..030b78d 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -16,8 +16,8 @@
*/
package org.apache.camel.util;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -915,7 +915,7 @@ public final class ExchangeHelper {
// safe copy message history using a defensive copy
List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
if (history != null) {
- answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history));
+ answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
}
return answer;
http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
new file mode 100644
index 0000000..b91354e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.MessageHistory;
+import org.apache.camel.NamedNode;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.MessageHistoryFactory;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+
+public class DefaultAsyncProcessorAwaitManagerTest {
+
+ private static final MessageHistoryFactory MESSAGE_HISTORY_FACTORY = new DefaultMessageHistoryFactory();
+ private DefaultAsyncProcessorAwaitManager defaultAsyncProcessorAwaitManager;
+ private DefaultExchange exchange;
+ private CountDownLatch latch;
+ private Thread thread;
+
+ @Test
+ public void testNoMessageHistory() throws Exception {
+ startAsyncProcess();
+ AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+ assertThat(awaitThread.getRouteId(), is(nullValue()));
+ assertThat(awaitThread.getNodeId(), is(nullValue()));
+ waitForEndOfAsyncProcess();
+ }
+
+ @Test
+ public void testMessageHistoryWithEmptyList() throws Exception {
+ startAsyncProcess();
+ exchange.setProperty(Exchange.MESSAGE_HISTORY, new LinkedList<MessageHistory>());
+ AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+ assertThat(awaitThread.getRouteId(), is(nullValue()));
+ assertThat(awaitThread.getNodeId(), is(nullValue()));
+ waitForEndOfAsyncProcess();
+ }
+
+ @Test
+ public void testMessageHistoryWithNullMessageHistory() throws Exception {
+ startAsyncProcess();
+ LinkedList<MessageHistory> messageHistories = new LinkedList<>();
+ messageHistories.add(null);
+ exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
+ AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+ assertThat(awaitThread.getRouteId(), is(nullValue()));
+ assertThat(awaitThread.getNodeId(), is(nullValue()));
+ waitForEndOfAsyncProcess();
+ }
+
+ @Test
+ public void testMessageHistoryWithNullElements() throws Exception {
+ startAsyncProcess();
+ LinkedList<MessageHistory> messageHistories = new LinkedList<>();
+ messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory(null,
+ new MockNamedNode().withId(null),
+ null));
+ exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
+ AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+ assertThat(awaitThread.getRouteId(), is(nullValue()));
+ assertThat(awaitThread.getNodeId(), is(nullValue()));
+ waitForEndOfAsyncProcess();
+ }
+
+ @Test
+ public void testMessageHistoryWithNotNullElements() throws Exception {
+ startAsyncProcess();
+ LinkedList<MessageHistory> messageHistories = new LinkedList<>();
+ messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory("routeId",
+ new MockNamedNode().withId("nodeId"),
+ null));
+ exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
+ AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+ assertThat(awaitThread.getRouteId(), is("routeId"));
+ assertThat(awaitThread.getNodeId(), is("nodeId"));
+ waitForEndOfAsyncProcess();
+ }
+
+ private void waitForEndOfAsyncProcess() {
+ latch.countDown();
+ while (thread.isAlive()) {
+ }
+ }
+
+ private void startAsyncProcess() throws InterruptedException {
+ defaultAsyncProcessorAwaitManager = new DefaultAsyncProcessorAwaitManager();
+ latch = new CountDownLatch(1);
+ BackgroundAwait backgroundAwait = new BackgroundAwait();
+ exchange = new DefaultExchange(new DefaultCamelContext());
+ thread = new Thread(backgroundAwait);
+ thread.start();
+ Thread.sleep(100);
+ }
+
+
+ private class BackgroundAwait implements Runnable {
+
+ @Override
+ public void run() {
+ defaultAsyncProcessorAwaitManager.await(exchange, latch);
+ }
+ }
+
+ private static class MockNamedNode implements NamedNode {
+
+ private String id;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public String getShortName() {
+ return this.getClass().getSimpleName();
+ }
+
+ @Override
+ public String getLabel() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public String getDescriptionText() {
+ return this.getClass().getCanonicalName();
+ }
+
+ public MockNamedNode withId(String id) {
+ this.id = id;
+ return this;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
index 1fc4635..b56b279 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -73,7 +73,8 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
log.info("Thread {} has waited for {} msec.", thread.getBlockedThread().getName(), wait);
assertEquals("myRoute", thread.getRouteId());
- assertEquals("myAsync", thread.getNodeId());
+ //assertEquals("myAsync", thread.getNodeId());
+ assertEquals("process1", thread.getNodeId());
}
})
.to("mock:result");