You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by an...@apache.org on 2016/07/07 20:43:55 UTC

camel git commit: CAMEL-10116: Retrieve last MessageHistory when getting NodeId and RouteId

Repository: camel
Updated Branches:
  refs/heads/master 6483909c2 -> 946c8dffb


CAMEL-10116: Retrieve last MessageHistory when getting NodeId and RouteId


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/946c8dff
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/946c8dff
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/946c8dff

Branch: refs/heads/master
Commit: 946c8dffb69f5bc95392d3e27aeaf5212cd46c87
Parents: 6483909
Author: Arno Noordover <an...@users.noreply.github.com>
Authored: Tue Jul 5 20:24:35 2016 +0200
Committer: Arno Noordover <an...@users.noreply.github.com>
Committed: Thu Jul 7 22:42:00 2016 +0200

----------------------------------------------------------------------
 .../impl/DefaultAsyncProcessorAwaitManager.java |  46 ++++--
 .../org/apache/camel/impl/DefaultExchange.java  |   3 +-
 .../camel/impl/DefaultInflightRepository.java   |  13 +-
 .../camel/processor/CamelInternalProcessor.java |   3 +-
 .../org/apache/camel/util/ExchangeHelper.java   |   4 +-
 .../DefaultAsyncProcessorAwaitManagerTest.java  | 154 +++++++++++++++++++
 .../async/AsyncProcessorAwaitManagerTest.java   |   3 +-
 7 files changed, 201 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index 2712178..d0d306b 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -18,7 +18,7 @@ package org.apache.camel.impl;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.MessageHistory;
+import org.apache.camel.NamedNode;
 import org.apache.camel.processor.DefaultExchangeFormatter;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ExchangeFormatter;
@@ -240,23 +241,12 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
         private final Exchange exchange;
         private final CountDownLatch latch;
         private final long start;
-        private String routeId;
-        private String nodeId;
 
         private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
             this.thread = thread;
             this.exchange = exchange;
             this.latch = latch;
             this.start = System.currentTimeMillis();
-
-            // capture details from message history if enabled
-            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
-            if (list != null && !list.isEmpty()) {
-                // grab last part
-                MessageHistory history = list.get(list.size() - 1);
-                routeId = history.getRouteId();
-                nodeId = history.getNode() != null ? history.getNode().getId() : null;
-            }
         }
 
         @Override
@@ -276,18 +266,46 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
 
         @Override
         public String getRouteId() {
-            return routeId;
+            MessageHistory lastMessageHistory = getLastMessageHistory();
+            if (lastMessageHistory == null) {
+                return null;
+            }
+            return lastMessageHistory.getRouteId();
         }
 
         @Override
         public String getNodeId() {
-            return nodeId;
+            NamedNode node = getNode();
+            if (node == null) {
+                return null;
+            }
+            return node.getId();
         }
 
         public CountDownLatch getLatch() {
             return latch;
         }
 
+        private NamedNode getNode() {
+            MessageHistory lastMessageHistory = getLastMessageHistory();
+            if (lastMessageHistory == null) {
+                return null;
+            }
+            return lastMessageHistory.getNode();
+        }
+
+        private MessageHistory getLastMessageHistory() {
+            LinkedList<MessageHistory> list = getMessageHistories();
+            if (list == null || list.isEmpty()) {
+                return null;
+            }
+            return list.getLast();
+        }
+
+        private LinkedList<MessageHistory> getMessageHistories() {
+            return exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
+        }
+
         @Override
         public String toString() {
             return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]";

http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
index 0091b6e..87b4ff0 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
@@ -17,6 +17,7 @@
 package org.apache.camel.impl;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -150,7 +151,7 @@ public final class DefaultExchange implements Exchange {
         // safe copy message history using a defensive copy
         List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
         if (history != null) {
-            answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history));
+            answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
         }
 
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index 6b65c24..4d0085a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -194,13 +195,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
         @Override
         @SuppressWarnings("unchecked")
         public long getElapsed() {
-            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+            LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
             if (list == null || list.isEmpty()) {
                 return 0;
             }
 
             // get latest entry
-            MessageHistory history = list.get(list.size() - 1);
+            MessageHistory history = list.getLast();
             if (history != null) {
                 return history.getElapsed();
             } else {
@@ -211,13 +212,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
         @Override
         @SuppressWarnings("unchecked")
         public String getNodeId() {
-            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+            LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
             if (list == null || list.isEmpty()) {
                 return null;
             }
 
             // get latest entry
-            MessageHistory history = list.get(list.size() - 1);
+            MessageHistory history = list.getLast();
             if (history != null) {
                 return history.getNode().getId();
             } else {
@@ -238,13 +239,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
         @Override
         @SuppressWarnings("unchecked")
         public String getAtRouteId() {
-            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+            LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
             if (list == null || list.isEmpty()) {
                 return null;
             }
 
             // get latest entry
-            MessageHistory history = list.get(list.size() - 1);
+            MessageHistory history = list.getLast();
             if (history != null) {
                 return history.getRouteId();
             } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 3c15167..a726b2f 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.RejectedExecutionException;
 
@@ -744,7 +745,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         public MessageHistory before(Exchange exchange) throws Exception {
             List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
             if (list == null) {
-                list = new ArrayList<MessageHistory>();
+                list = new LinkedList<>();
                 exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
             }
             MessageHistory history = factory.newMessageHistory(routeId, definition, new Date());

http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 9641026..030b78d 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -16,8 +16,8 @@
  */
 package org.apache.camel.util;
 
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -915,7 +915,7 @@ public final class ExchangeHelper {
         // safe copy message history using a defensive copy
         List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
         if (history != null) {
-            answer.put(Exchange.MESSAGE_HISTORY, new ArrayList<MessageHistory>(history));
+            answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
         }
 
         return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
new file mode 100644
index 0000000..b91354e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManagerTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.MessageHistory;
+import org.apache.camel.NamedNode;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.MessageHistoryFactory;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+
+public class DefaultAsyncProcessorAwaitManagerTest {
+
+    private static final MessageHistoryFactory MESSAGE_HISTORY_FACTORY = new DefaultMessageHistoryFactory();
+    private DefaultAsyncProcessorAwaitManager defaultAsyncProcessorAwaitManager;
+    private DefaultExchange exchange;
+    private CountDownLatch latch;
+    private Thread thread;
+
+    @Test
+    public void testNoMessageHistory() throws Exception {
+        startAsyncProcess();
+        AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+        assertThat(awaitThread.getRouteId(), is(nullValue()));
+        assertThat(awaitThread.getNodeId(), is(nullValue()));
+        waitForEndOfAsyncProcess();
+    }
+
+    @Test
+    public void testMessageHistoryWithEmptyList() throws Exception {
+        startAsyncProcess();
+        exchange.setProperty(Exchange.MESSAGE_HISTORY, new LinkedList<MessageHistory>());
+        AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+        assertThat(awaitThread.getRouteId(), is(nullValue()));
+        assertThat(awaitThread.getNodeId(), is(nullValue()));
+        waitForEndOfAsyncProcess();
+    }
+
+    @Test
+    public void testMessageHistoryWithNullMessageHistory() throws Exception {
+        startAsyncProcess();
+        LinkedList<MessageHistory> messageHistories = new LinkedList<>();
+        messageHistories.add(null);
+        exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
+        AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+        assertThat(awaitThread.getRouteId(), is(nullValue()));
+        assertThat(awaitThread.getNodeId(), is(nullValue()));
+        waitForEndOfAsyncProcess();
+    }
+
+    @Test
+    public void testMessageHistoryWithNullElements() throws Exception {
+        startAsyncProcess();
+        LinkedList<MessageHistory> messageHistories = new LinkedList<>();
+        messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory(null,
+                new MockNamedNode().withId(null),
+                null));
+        exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
+        AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+        assertThat(awaitThread.getRouteId(), is(nullValue()));
+        assertThat(awaitThread.getNodeId(), is(nullValue()));
+        waitForEndOfAsyncProcess();
+    }
+
+    @Test
+    public void testMessageHistoryWithNotNullElements() throws Exception {
+        startAsyncProcess();
+        LinkedList<MessageHistory> messageHistories = new LinkedList<>();
+        messageHistories.add(MESSAGE_HISTORY_FACTORY.newMessageHistory("routeId",
+                new MockNamedNode().withId("nodeId"),
+                null));
+        exchange.setProperty(Exchange.MESSAGE_HISTORY, messageHistories);
+        AsyncProcessorAwaitManager.AwaitThread awaitThread = defaultAsyncProcessorAwaitManager.browse().iterator().next();
+        assertThat(awaitThread.getRouteId(), is("routeId"));
+        assertThat(awaitThread.getNodeId(), is("nodeId"));
+        waitForEndOfAsyncProcess();
+    }
+
+    private void waitForEndOfAsyncProcess() {
+        latch.countDown();
+        while (thread.isAlive()) {
+        }
+    }
+
+    private void startAsyncProcess() throws InterruptedException {
+        defaultAsyncProcessorAwaitManager = new DefaultAsyncProcessorAwaitManager();
+        latch = new CountDownLatch(1);
+        BackgroundAwait backgroundAwait = new BackgroundAwait();
+        exchange = new DefaultExchange(new DefaultCamelContext());
+        thread = new Thread(backgroundAwait);
+        thread.start();
+        Thread.sleep(100);
+    }
+
+
+    private class BackgroundAwait implements Runnable {
+
+        @Override
+        public void run() {
+            defaultAsyncProcessorAwaitManager.await(exchange, latch);
+        }
+    }
+
+    private static class MockNamedNode implements NamedNode {
+
+        private String id;
+
+        @Override
+        public String getId() {
+            return id;
+        }
+
+        @Override
+        public String getShortName() {
+            return this.getClass().getSimpleName();
+        }
+
+        @Override
+        public String getLabel() {
+            return this.getClass().getName();
+        }
+
+        @Override
+        public String getDescriptionText() {
+            return this.getClass().getCanonicalName();
+        }
+
+        public MockNamedNode withId(String id) {
+            this.id = id;
+            return this;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/946c8dff/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
index 1fc4635..b56b279 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -73,7 +73,8 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
                                 log.info("Thread {} has waited for {} msec.", thread.getBlockedThread().getName(), wait);
 
                                 assertEquals("myRoute", thread.getRouteId());
-                                assertEquals("myAsync", thread.getNodeId());
+                                //assertEquals("myAsync", thread.getNodeId());
+                                assertEquals("process1", thread.getNodeId());
                             }
                         })
                         .to("mock:result");