You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/22 21:02:04 UTC
svn commit: r777657 -
/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
Author: davsclaus
Date: Fri May 22 19:02:03 2009
New Revision: 777657
URL: http://svn.apache.org/viewvc?rev=777657&view=rev
Log:
ExecutorService that is ordred by the order in which the tasks was submitted. This is to be used by the MulticastProcessor.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java (with props)
Added: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java?rev=777657&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java Fri May 22 19:02:03 2009
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.concurrent;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A {@link java.util.concurrent.CompletionService} that orders the completed tasks
+ * in the same order as they where submitted.
+ *
+ * @version $Revision$
+ */
+public class SubmitOrderedCompletionService<V> implements CompletionService<V> {
+
+ private final Executor executor;
+
+ // the idea to order the completed task in the same order as they where submitted is to leverage
+ // the delay queue. With the delay queue we can control the order by the getDelay and compareTo methods
+ // where we can order the tasks in the same order as they where submitted.
+ private final DelayQueue completionQueue = new DelayQueue();
+
+ // id is the unique id that determines the order in which tasks was submitted (incrementing)
+ private final AtomicInteger id = new AtomicInteger();
+ // index is the index of the next id that should expire and thus be ready to take from the delayed queue
+ private final AtomicInteger index = new AtomicInteger();
+
+ private class SubmitOrderFutureTask<V> extends FutureTask<Void> implements Delayed {
+
+ // the id this task was assigned
+ private final long id;
+
+ public SubmitOrderFutureTask(long id, Callable<Void> voidCallable) {
+ super(voidCallable);
+ this.id = id;
+ }
+
+ public SubmitOrderFutureTask(long id, Runnable runnable, Void result) {
+ super(runnable, result);
+ this.id = id;
+ }
+
+ public long getDelay(TimeUnit unit) {
+ // if the answer is 0 then this task is ready to be taken
+ long answer = id - index.get();
+ return answer;
+ }
+
+ @SuppressWarnings("unchecked")
+ public int compareTo(Delayed o) {
+ SubmitOrderFutureTask other = (SubmitOrderFutureTask) o;
+ int answer = (int) (this.id - other.id);
+ return answer;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void done() {
+ // when we are done add to the completion queue
+ completionQueue.add(this);
+ }
+ }
+
+ public SubmitOrderedCompletionService(Executor executor) {
+ this.executor = executor;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Future<V> submit(Callable task) {
+ if (task == null) {
+ throw new IllegalArgumentException("Task must be provided");
+ }
+ SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task);
+ executor.execute(f);
+ return f;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Future<V> submit(Runnable task, Object result) {
+ if (task == null) {
+ throw new IllegalArgumentException("Task must be provided");
+ }
+ SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task, null);
+ executor.execute(f);
+ return f;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Future<V> take() throws InterruptedException {
+ index.incrementAndGet();
+ return (Future) completionQueue.take();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Future<V> poll() {
+ index.incrementAndGet();
+ return (Future) completionQueue.poll();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+ index.incrementAndGet();
+ return (Future) completionQueue.poll(timeout, unit);
+ }
+}
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
------------------------------------------------------------------------------
svn:keywords = Rev Date