You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/01 07:03:19 UTC

svn commit: r561660 [1/2] - in /incubator/servicemix/trunk: core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/ core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/ deployables/serviceengines/servicemix-eip/src/main/java/or...

Author: gnodet
Date: Tue Jul 31 22:03:17 2007
New Revision: 561660

URL: http://svn.apache.org/viewvc?view=rev&rev=561660
Log:
SM-1008: Reseqencer pattern for EIP component. Patch provided by Martin Krasser, thx\!

Added:
    incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java
    incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/TestComparator.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/TestComparatorTest.java
    incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/TestObject.java

Added: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.ByteArrayDataSource;
+import org.apache.servicemix.jbi.util.FileUtil;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+/**
+ * A thread-safe copier for NormalizedMessage onjects.
+ * 
+ * @author Martin Krasser
+ */
+public class MessageCopier {
+
+    private boolean copySubject;
+    private boolean copyContent;
+    private boolean copyProperties;
+    private boolean copyAttachments;
+
+    /**
+     * Creates a new message copier instance that creates full (deep) message
+     * copies.
+     */
+    public MessageCopier() {
+        this(true, true, true, true);
+    }
+    
+    /**
+     * Create a new message copier instance.
+     * 
+     * @param copySubject <code>true</code> if subject shall be copied.
+     * @param copyContent <code>true</code> if content shall be copied
+     * @param copyProperties <code>true</code> if properties shall be copied 
+     * @param copyAttachments <code>true</code> if attachments shall be copied
+     */
+    public MessageCopier(boolean copySubject, boolean copyContent, boolean copyProperties, boolean copyAttachments) {
+        super();
+        this.copySubject = copySubject;
+        this.copyContent = copyContent;
+        this.copyProperties = copyProperties;
+        this.copyAttachments = copyAttachments;
+    }
+
+    /**
+     * Copies messages under consideration of the <code>copySubject</code>,
+     * <code>copyContent</code>, <code>copyProperties</code>,
+     * <code>copyAttachments</code> properties.
+     * 
+     * @param message original message.
+     * @return a copy of the original message.
+     * @throws MessagingException if a system-level exception occurs.
+     */
+    public NormalizedMessage copy(NormalizedMessage message) throws MessagingException {
+        NormalizedMessage copy = new MessageUtil.NormalizedMessageImpl();
+        if (copySubject) {
+            copySubject(message, copy);
+        }
+        if (copyContent) {
+            copyContent(message, copy);
+        }
+        if (copyProperties) {
+            copyProperties(message, copy);
+        }
+        if (copyAttachments) {
+            copyAttachments(message, copy);
+        }
+        return copy;
+    }
+    
+    public boolean isCopyAttachments() {
+        return copyAttachments;
+    }
+
+    public boolean isCopyContent() {
+        return copyContent;
+    }
+
+    public boolean isCopyProperties() {
+        return copyProperties;
+    }
+
+    public boolean isCopySubject() {
+        return copySubject;
+    }
+
+    private static void copySubject(NormalizedMessage from, NormalizedMessage to) {
+        to.setSecuritySubject(from.getSecuritySubject());
+    }
+    
+    private static void copyContent(NormalizedMessage from, NormalizedMessage to) throws MessagingException{
+        String str = null; 
+        try {
+            str = new SourceTransformer().toString(from.getContent());
+        } catch (Exception e) {
+            throw new MessagingException(e);
+        }
+        if (str != null) {
+            to.setContent(new StringSource(str));
+        }
+    }
+    
+    private static void copyProperties(NormalizedMessage from, NormalizedMessage to) {
+        for (Object name : from.getPropertyNames()) {
+            to.setProperty((String)name, from.getProperty((String)name));
+        }
+    }
+    
+    private static void copyAttachments(NormalizedMessage from, NormalizedMessage to) throws MessagingException{
+        for (Object name : from.getAttachmentNames()) {
+            DataHandler handler = from.getAttachment((String)name);
+            DataSource source = handler.getDataSource();
+            if (source instanceof ByteArrayDataSource == false) {
+                DataSource copy = copyDataSource(source);
+                handler = new DataHandler(copy);
+            }
+            to.addAttachment((String)name, handler);
+        }
+    }
+    
+    private static DataSource copyDataSource(DataSource source) throws MessagingException {
+        try {
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            FileUtil.copyInputStream(source.getInputStream(), baos);
+            ByteArrayDataSource bads = new ByteArrayDataSource(baos.toByteArray(), source.getContentType());
+            bads.setName(source.getName());
+            return bads;
+        } catch (IOException e) {
+            throw new MessagingException(e);
+        }
+    }
+    
+}

Added: incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.util;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.security.auth.Subject;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.ByteArrayDataSource;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+public class MessageCopierTest extends TestCase {
+
+    private NormalizedMessage message;
+    
+    private Subject subject;
+    
+    public void setUp() throws Exception {
+        subject = new Subject();
+        message = new MessageUtil.NormalizedMessageImpl();
+        message.setContent(new StringSource("<doc>s1<doc>"));
+        message.addAttachment("a", new DataHandler(createDataSource("s2")));
+        message.setProperty("p", "s3");
+        message.setSecuritySubject(subject);
+    }
+
+    public void tearDown() throws Exception {
+    }
+
+    public void testCopyContent() throws Exception {
+        MessageCopier copier = new MessageCopier(false, true, false, false);
+        NormalizedMessage copy = copier.copy(message);
+        String content = new SourceTransformer().toString(copy.getContent());
+        assertEquals("wrong content", "<doc>s1<doc>", content);
+        assertEquals("wrong attachment", null, copy.getAttachment("a"));
+        assertEquals("wrong property", null, copy.getProperty("p"));
+        assertEquals("wrong subject", null, copy.getSecuritySubject());
+    }
+    
+    public void testCopyAttachment() throws Exception {
+        MessageCopier copier = new MessageCopier(false, false, false, true); 
+        NormalizedMessage copy = copier.copy(message);
+        String attachment = IOUtils.toString(copy.getAttachment("a").getInputStream());
+        assertEquals("wrong content", null, copy.getContent());
+        assertEquals("wrong attachment", "s2", attachment);
+        assertEquals("wrong property", null, copy.getProperty("p"));
+        assertEquals("wrong subject", null, copy.getSecuritySubject());
+    }
+    
+    public void testCopyProperties() throws Exception {
+        MessageCopier copier = new MessageCopier(false, false, true, false); 
+        NormalizedMessage copy = copier.copy(message);
+        assertEquals("wrong content", null, copy.getContent());
+        assertEquals("wrong attachment", null, copy.getAttachment("a"));
+        assertEquals("wrong property", "s3", copy.getProperty("p"));
+        assertEquals("wrong subject", null, copy.getSecuritySubject());
+    }
+    
+    public void testCopySubject() throws Exception {
+        MessageCopier copier = new MessageCopier(true, false, false, false); 
+        NormalizedMessage copy = copier.copy(message);
+        assertEquals("wrong content", null, copy.getContent());
+        assertEquals("wrong attachment", null, copy.getAttachment("a"));
+        assertEquals("wrong property", null, copy.getProperty("p"));
+        assertEquals("wrong subject", subject, copy.getSecuritySubject());
+    }
+    
+    private static DataSource createDataSource(String text) {
+        return new ByteArrayDataSource(text.getBytes(), "text/plain");
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.eip.support.resequence.DefaultComparator;
+import org.apache.servicemix.eip.support.resequence.ResequencerBase;
+import org.apache.servicemix.eip.support.resequence.ResequencerEngine;
+import org.apache.servicemix.eip.support.resequence.SequenceElementComparator;
+import org.apache.servicemix.eip.support.resequence.SequenceReader;
+import org.apache.servicemix.eip.support.resequence.SequenceSender;
+import org.apache.servicemix.executors.Executor;
+
+/**
+ * @author Martin Krasser
+ * 
+ * @org.apache.xbean.XBean element="resequencer"
+ */
+public class Resequencer extends ResequencerBase implements SequenceSender {
+
+    private ResequencerEngine<MessageExchange> reseq;
+    
+    private SequenceReader reader;
+
+    private Executor executor;
+
+    private int capacity;
+    
+    private long timeout;
+    
+    private SequenceElementComparator<MessageExchange> comparator;
+    
+    public Resequencer() {
+        this.reader = new SequenceReader(this);
+        this.comparator = new DefaultComparator();
+    }
+    
+    public void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+    
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    public void setComparator(SequenceElementComparator<MessageExchange> comparator) {
+        this.comparator = comparator;
+    }
+    
+    @Override
+    public void start() throws Exception {
+        super.start();
+        if (executor == null) {
+            executor = getServiceUnit().getComponent().getExecutor();
+        }
+        BlockingQueue<MessageExchange> queue = new LinkedBlockingQueue<MessageExchange>();
+        reseq = new ResequencerEngine<MessageExchange>(comparator, capacity);
+        reseq.setTimeout(timeout);
+        reseq.setOutQueue(queue);
+        reader.setQueue(queue);
+        reader.start(executor);
+    }
+
+    @Override
+    public void stop() throws Exception {
+        reseq.stop();
+        reader.stop();
+        super.stop();
+    }
+    
+    public void sendSync(MessageExchange exchange) throws MessagingException {
+        super.sendSync(exchange);
+    }
+    
+    public void sendSync(List<MessageExchange> exchanges) throws MessagingException {
+        for (MessageExchange exchange : exchanges) {
+            sendSync(exchange);
+        }
+    }
+
+    @Override
+    protected void processSync(MessageExchange exchange) throws Exception {
+        fail(exchange, new UnsupportedOperationException("synchronous resequencing not supported"));
+    }
+
+    @Override
+    protected void processAsync(MessageExchange exchange) throws Exception {
+        validateMessageExchange(exchange);
+        if (exchange.getStatus() == ExchangeStatus.DONE) {
+            return;
+        } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            return;
+        } else if (exchange.getFault() != null) {
+            done(exchange);
+            return;
+        }
+        processMessage(exchange);
+        done(exchange);
+    }
+
+    private void processMessage(MessageExchange sourceExchange) throws MessagingException, InterruptedException {
+        NormalizedMessage source = sourceExchange.getMessage("in");
+        NormalizedMessage copy = getMessageCopier().copy(source);
+        MessageExchange targetExchange = createTargetExchange(copy, sourceExchange.getPattern());
+        // add target exchange to resequencer (blocking if capacity is reached)
+        reseq.put(targetExchange);
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import javax.jbi.messaging.MessageExchange;
+
+/**
+ * Compares {@link MessageExchange} sequence elements based on sequence numbers
+ * defined by their in-{@link NormalizedMessage}s. This comparator works on
+ * sequence numbers of type {@link Long}. Sequence numbers must be stored as
+ * {@link NormalizedMessage} properties. The property name under which the
+ * sequence number is stored is configured via this comparator's
+ * <code>sequenceNumberKey</code> property.
+ * 
+ * @author Martin Krasser
+ * 
+ * @org.apache.xbean.XBean element="default-comparator"
+ */
+public class DefaultComparator implements SequenceElementComparator<MessageExchange> {
+
+    public static final String SEQUENCE_NUMBER_KEY = "org.apache.servicemix.eip.sequence.number";
+    
+    private static final String IN = "in";
+    
+    private String sequenceNumberKey;
+    
+    private boolean sequenceNumberAsString;
+    
+    public DefaultComparator() {
+        sequenceNumberKey = SEQUENCE_NUMBER_KEY;
+        sequenceNumberAsString = false;
+    }
+    
+    public String getSequenceNumberKey() {
+        return sequenceNumberKey;
+    }
+
+    public void setSequenceNumberKey(String sequenceNumberPropertyName) {
+        this.sequenceNumberKey = sequenceNumberPropertyName;
+    }
+
+    public boolean isSequenceNumberAsString() {
+        return sequenceNumberAsString;
+    }
+
+    public void setSequenceNumberAsString(boolean sequenceNumberAsString) {
+        this.sequenceNumberAsString = sequenceNumberAsString;
+    }
+
+    public boolean predecessor(MessageExchange o1, MessageExchange o2) {
+        long n1 = getSequenceNumber(o1).longValue();
+        long n2 = getSequenceNumber(o2).longValue();
+        return n1 == (n2 - 1L);
+    }
+
+    public boolean successor(MessageExchange o1, MessageExchange o2) {
+        long n1 = getSequenceNumber(o1).longValue();
+        long n2 = getSequenceNumber(o2).longValue();
+        return n2 == (n1 - 1L);
+    }
+
+    public int compare(MessageExchange o1, MessageExchange o2) {
+        Long n1 = getSequenceNumber(o1);
+        Long n2 = getSequenceNumber(o2);
+        return n1.compareTo(n2);
+    }
+
+    private Long getSequenceNumber(MessageExchange exchange) {
+        Object number = exchange.getMessage(IN).getProperty(sequenceNumberKey);
+        if (sequenceNumberAsString) {
+            return new Long((String)number);
+        } else {
+            return (Long)number;
+        }
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+/**
+ * A container for objects to be resequenced. This container can be scheduled
+ * for timing out. Non-scheduled objects or already timed-out objects are ready
+ * for being released by the {@link ResequencerEngine}.
+ * 
+ * @author Martin Krasser
+ */
+class Element<E> implements TimeoutHandler {
+
+    /**
+     * The contained object.
+     */
+    private E object;
+
+    /**
+     * Not <code>null</code> if this element is currently beeing scheduled for
+     * timing out.
+     */
+    private Timeout timeout;
+    
+    /**
+     * Creates a new container instance.
+     * 
+     * @param object contained object.
+     */
+    public Element(E object) {
+        this.object = object;
+    }
+    
+    /**
+     * Returns the contained object.
+     * 
+     * @return the contained object.
+     */
+    public E getObject() {
+        return object;
+    }
+
+    /**
+     * Returns <code>true</code> if this element is currently scheduled for
+     * timing out.
+     * 
+     * @return <code>true</code> if scheduled or <code>false</code> if not
+     *         scheduled or already timed-out.
+     */
+    public synchronized boolean scheduled() {
+        return timeout != null;
+    }
+    
+    /**
+     * Schedules the given timeout task. Before this methods calls the
+     * {@link Timeout#schedule()} method it adds this element as timeout
+     * listener.
+     * 
+     * @param t a timeout task.
+     */
+    public synchronized void schedule(Timeout t) {
+        this.timeout = t;
+        this.timeout.addTimeoutHandlerFirst(this);
+        this.timeout.schedule();
+    }
+    
+    /**
+     * Cancels the scheduled timeout for this element. If this element is not
+     * scheduled or has already timed-out this method has no effect.
+     */
+    public synchronized void cancel() {
+        if (timeout != null) {
+            timeout.cancel();
+        }
+        timeout(null);
+    }
+
+    /**
+     * Marks this element as timed-out.
+     * 
+     * @param t timeout task that caused the notification.
+     */
+    public synchronized void timeout(Timeout t) {
+        this.timeout = null;
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+/**
+ * A strategy for comparing {@link Element} instances. This strategy uses
+ * another {@link SequenceElementComparator} instance for comparing elements
+ * contained by {@link Element} instances.
+ * 
+ * @author Martin Krasser
+ */
+class ElementComparator<E> implements SequenceElementComparator<Element<E>> {
+
+    /**
+     * A sequence element comparator this comparator delegates to.
+     */
+    private SequenceElementComparator<E> comparator;
+    
+    /**
+     * Creates a new element comparator instance.
+     * 
+     * @param comparator a sequence element comparator this comparator delegates
+     *        to.
+     */
+    public ElementComparator(SequenceElementComparator<E> comparator) {
+        this.comparator = comparator;
+    }
+    
+    /**
+     * @see com.icw.ehf.integration.common.collection.SequenceElementComparator#predecessor(java.lang.Object, java.lang.Object)
+     */
+    public boolean predecessor(Element<E> o1, Element<E> o2) {
+        return comparator.predecessor(o1.getObject(), o2.getObject());
+    }
+
+    /**
+     * @see com.icw.ehf.integration.common.collection.SequenceElementComparator#successor(java.lang.Object, java.lang.Object)
+     */
+    public boolean successor(Element<E> o1, Element<E> o2) {
+        return comparator.successor(o1.getObject(), o2.getObject());
+    }
+
+    /**
+     * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+     */
+    public int compare(Element<E> o1, Element<E> o2) {
+        return comparator.compare(o1.getObject(), o2.getObject());
+    }
+
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.net.URI;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.jbi.util.MessageCopier;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+public abstract class ResequencerBase extends EIPEndpoint {
+
+    private MessageCopier messageCopier = new MessageCopier();
+
+    private ExchangeTarget target;
+    
+    public MessageCopier getMessageCopier() {
+        return messageCopier;
+    }
+
+    public ExchangeTarget getTarget() {
+        return target;
+    }
+
+    public void setTarget(ExchangeTarget target) {
+        this.target = target;
+    }
+    
+    @Override
+    public void validate() throws DeploymentException {
+        super.validate();
+        if (target == null) {
+            throw new IllegalArgumentException("target must be set to a valid ExchangeTarget");
+        }
+    }
+    
+    public void validateMessageExchange(MessageExchange exchange) throws MessagingException {
+        if ((exchange instanceof InOnly) || (exchange instanceof RobustInOnly)) {
+            return;
+        }
+        fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+    }
+
+    protected MessageExchange createTargetExchange(NormalizedMessage message, URI exchangePattern) throws MessagingException {
+        MessageExchange targetExchange = getExchangeFactory().createExchange(exchangePattern);
+        target.configureTarget(targetExchange, getContext());
+        MessageUtil.transferToIn(message, targetExchange);
+        return targetExchange;
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.Queue;
+import java.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Resequences elements based on a given {@link SequenceElementComparator}.
+ * This resequencer is designed for resequencing element streams. Resequenced
+ * elements are added to an output {@link Queue}. The resequencer is configured
+ * via the <code>timeout</code> and <code>capacity</code> properties.
+ * 
+ * <ul>
+ * <li><code>timeout</code>. Defines the timeout (in milliseconds) for a
+ * given element managed by this resequencer. An out-of-sequence element can
+ * only be marked as <i>ready-for-delivery</i> if it either times out or if it
+ * has an immediate predecessor (in that case it is in-sequence). If an
+ * immediate predecessor of a waiting element arrives the timeout task for the
+ * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
+ * <p>
+ * If the maximum out-of-sequence time between elements within a stream is
+ * known, the <code>timeout</code> value should be set to this value. In this
+ * case it is guaranteed that all elements of a stream will be delivered in
+ * sequence to the output queue. However, large <code>timeout</code> values
+ * might require a very high resequencer <code>capacity</code> which might be
+ * in conflict with available memory resources. The lower the
+ * <code>timeout</code> value is compared to the out-of-sequence time between
+ * elements within a stream the higher the probability is for out-of-sequence
+ * elements delivered by this resequencer.</li>
+ * <li><code>capacity</code>. The capacity of this resequencer.</li>
+ * </ul>
+ * 
+ * Whenever a timeout for a certain element occurs or an element has been added
+ * to this resequencer a delivery attempt is started. If a (sub)sequence of
+ * elements is <i>ready-for-delivery</i> then they are added to output queue.
+ * <p>
+ * The resequencer remembers the last-delivered element. If an element arrives
+ * which is the immediate successor of the last-delivered element it will be
+ * delivered immediately and the last-delivered element is adjusted accordingly.
+ * If the last-delivered element is <code>null</code> i.e. the resequencer was
+ * newly created the first arriving element will wait <code>timeout</code>
+ * milliseconds for being delivered to the output queue.
+ * 
+ * @author Martin Krasser
+ */
+public class ResequencerEngine<E> implements TimeoutHandler {
+
+    private static final Log LOG = LogFactory.getLog(ResequencerEngine.class);
+    
+    private long timeout;
+    
+    private int capacity;
+    
+    private Queue<E> outQueue;
+    
+    private Element<E> lastDelivered;
+
+    /**
+     * A sequence of elements for sorting purposes.
+     */
+    private Sequence<Element<E>> sequence;
+    
+    /**
+     * A timer for scheduling timeout notifications.
+     */
+    private Timer timer;
+    
+    /**
+     * Creates a new resequencer instance with a default timeout of 2000
+     * milliseconds. The capacity is set to {@link Integer#MAX_VALUE}.
+     * 
+     * @param comparator a sequence element comparator.
+     */
+    public ResequencerEngine(SequenceElementComparator<E> comparator) {
+        this(comparator, Integer.MAX_VALUE);
+    }
+
+    /**
+     * Creates a new resequencer instance with a default timeout of 2000
+     * milliseconds.
+     * 
+     * @param comparator a sequence element comparator.
+     * @param capacity the capacity of this resequencer.
+     */
+    public ResequencerEngine(SequenceElementComparator<E> comparator, int capacity) {
+        this.timer = new Timer("Resequencer Timer");
+        this.sequence = createSequence(comparator);
+        this.capacity = capacity;
+        this.timeout = 2000L;
+        this.lastDelivered = null;
+    }
+    
+    /**
+     * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
+     */
+    public void stop() {
+        this.timer.cancel();
+    }
+    
+    /**
+     * Returns the output queue.
+     * 
+     * @return the output queue.
+     */
+    public Queue<E> getOutQueue() {
+        return outQueue;
+    }
+
+    /**
+     * Sets the output queue.
+     * 
+     * @param outQueue output queue.
+     */
+    public void setOutQueue(Queue<E> outQueue) {
+        this.outQueue = outQueue;
+    }
+
+    /**
+     * Returns this resequencer's timeout value.
+     * 
+     * @return the timeout in milliseconds.
+     */
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets this sequencer's timeout value.
+     * 
+     * @param timeout the timeout in milliseconds.
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    /** 
+     * Handles a timeout notification by starting a delivery attempt.
+     * 
+     * @param timout timeout task that caused the notification.
+     */
+    public synchronized void timeout(Timeout timout) {
+        try {
+            while (deliver()) {
+                // work done in deliver()
+            }
+        } catch (RuntimeException e) {
+            LOG.error("error during delivery", e);
+        }
+    }
+
+    /**
+     * Adds an element to this resequencer throwing an exception if the maximum
+     * capacity is reached.
+     * 
+     * @param o element to be resequenced.
+     * @throws IllegalStateException if the element cannot be added at this time
+     *         due to capacity restrictions.
+     */
+    public synchronized void add(E o) {
+        if (sequence.size() >= capacity) {
+            throw new IllegalStateException("maximum capacity is reached");
+        }
+        insert(o);
+    }
+    
+    /**
+     * Adds an element to this resequencer waiting, if necessary, until capacity
+     * becomes available.
+     * 
+     * @param o element to be resequenced.
+     * @throws InterruptedException if interrupted while waiting.
+     */
+    public synchronized void put(E o) throws InterruptedException {
+        if (sequence.size() >= capacity) {
+            wait();
+        }
+        insert(o);
+    }
+    
+    /**
+     * Returns the last delivered element.
+     * 
+     * @return the last delivered element or <code>null</code> if no delivery
+     *         has been made yet.
+     */
+    E getLastDelivered() {
+        if (lastDelivered == null) {
+            return null;
+        }
+        return lastDelivered.getObject();
+    }
+    
+    /**
+     * Sets the last delivered element. This is for testing purposes only.
+     * 
+     * @param o an element.
+     */
+    void setLastDelivered(E o) {
+        lastDelivered = new Element<E>(o);
+    }
+    
+    /**
+     * Inserts the given element into this resequencing queue (sequence). If the
+     * element is not ready for immediate delivery and has no immediate
+     * presecessor then it is scheduled for timing out. After being timed out it
+     * is ready for delivery.
+     * 
+     * @param o an element.
+     */
+    private void insert(E o) {
+        // wrap object into internal element
+        Element<E> element = new Element<E>(o);
+        // add element to sequence in proper order
+        sequence.add(element);
+
+        Element<E> successor = sequence.successor(element);
+        
+        // check if there is an immediate successor and cancel
+        // timer task (no need to wait any more for timeout)
+        if (successor != null) {
+            successor.cancel();
+        }
+        
+        // start delivery if current element is successor of last delivered element
+        if (successorOfLastDelivered(element)) {
+            // nothing to schedule
+        } else if (sequence.predecessor(element) != null) {
+            // nothing to schedule
+        } else {
+            Timeout t = defineTimeout();
+            element.schedule(t);
+        }
+        
+        // start delivery
+        while (deliver()) {
+            // work done in deliver()
+        }
+    }
+    
+    /**
+     * Attempts to deliver a single element from the head of the resequencer
+     * queue (sequence). Only elements which have not been scheduled for timing
+     * out or which already timed out can be delivered.
+     * 
+     * @return <code>true</code> if the element has been delivered
+     *         <code>false</code> otherwise.
+     */
+    private boolean deliver() {
+        if (sequence.size() == 0) {
+            return false;
+        }
+        // inspect element with lowest sequence value
+        Element<E> element = sequence.first();
+        
+        // if element is scheduled do not deliver and return
+        if (element.scheduled()) {
+            return false;
+        }
+        
+        // remove deliverable element from sequence
+        sequence.remove(element);
+
+        // set the delivered element to last delivered element
+        lastDelivered = element;
+        
+        // notify a waiting thread that capacity is available
+        notify();
+        
+        // add element to output queue
+        outQueue.add(element.getObject());
+
+        // element has been delivered
+        return true;
+    }
+    
+    /**
+     * Returns <code>true</code> if the given element is the immediate
+     * successor of the last delivered element.
+     * 
+     * @param element an element.
+     * @return <code>true</code> if the given element is the immediate
+     *         successor of the last delivered element.
+     */
+    private boolean successorOfLastDelivered(Element<E> element) {
+        if (lastDelivered == null) {
+            return false;
+        }
+        if (sequence.comparator().successor(element, lastDelivered)) {
+            return true;
+        }
+        return false;
+    }
+    
+    /**
+     * Creates a timeout task based on the timeout setting of this resequencer.
+     * 
+     * @return a new timeout task.
+     */
+    private Timeout defineTimeout() {
+        Timeout result = new Timeout(timer, timeout);
+        result.addTimeoutHandler(this);
+        return result;
+    }
+    
+    private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
+        return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.TreeSet;
+
+/**
+ * A sorted set of elements with additional methods for obtaining immediate
+ * successors and immediate predecessors of a given element in the sequence.
+ * Successors and predecessors are calculated by using a
+ * {@link SequenceElementComparator}.
+ * 
+ * @author Martin Krasser
+ */
+public class Sequence<E> extends TreeSet<E> {
+
+    private static final long serialVersionUID = 5647393631147741711L;
+
+    private SequenceElementComparator<E> comparator;
+    
+    /**
+     * Creates a new {@link Sequence} instance.
+     * 
+     * @param comparator a strategy for comparing elements of this sequence.
+     */
+    public Sequence(SequenceElementComparator<E> comparator) {
+        super(comparator);
+        this.comparator = comparator;
+    }
+    
+    /**
+     * Returns the immediate predecessor of the given element in this sequence
+     * or <code>null</code> if no predecessor exists.
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E predecessor(E e) {
+        E elem = lower(e);
+        if (elem == null) {
+            return null;
+        }
+        if (comparator.predecessor(elem, e)) {
+            return elem;
+        }
+        return null;
+    }
+    
+    /**
+     * Returns the immediate successor of the given element in this sequence
+     * or <code>null</code> if no successor exists.
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E successor(E e) {
+        E elem = higher(e);
+        if (elem == null) {
+            return null;
+        }
+        if (comparator.successor(elem, e)) {
+            return elem;
+        }
+        return null;
+    }
+    
+    /**
+     * Returns this sequence's comparator.
+     * 
+     * @return this sequence's comparator.
+     */
+    public SequenceElementComparator<E> comparator() {
+        return comparator;
+    }
+
+    /**
+     * Returns the next higher element in the sequence to the given element. If
+     * the given element doesn't exist or if it is the last element in the
+     * sequence <code>null</code> is returned. <strong>Please note that this
+     * method is provided for compatibility with Java 5 SE. On a Java 6 SE
+     * platform the same method implemented by the {@link TreeSet}
+     * class should be used for better performance.</strong>
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E higher(E e) {
+        boolean found = false;
+        for (E current : this) {
+            if (found) {
+                return current;
+            }
+            if (comparator.compare(e, current) == 0) {
+                found = true;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Returns the next lower element in the sequence to the given element. If
+     * the given element doesn't exist or if it is the first element in the
+     * sequence <code>null</code> is returned. <strong>Please note that this
+     * method is provided for compatibility with Java 5 SE. On a Java 6 SE
+     * platform the same method implemented by the {@link TreeSet}
+     * class should be used for better performance.</strong>
+     * 
+     * @param e an element which is compared to elements of this sequence.
+     * @return an element of this sequence or <code>null</code>.
+     */
+    public E lower(E e) {
+        E last = null;
+        for (E current : this) {
+            if (comparator.compare(e, current) == 0) {
+                return last;
+            }
+            last = current;
+        }
+        return last;
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.Comparator;
+
+/**
+ * A strategy for comparing elements of a sequence.
+ * 
+ * @author Martin Krasser
+ */
+public interface SequenceElementComparator<E> extends Comparator<E> {
+
+    /**
+     * Returns <code>true</code> if <code>o1</code> is an immediate predecessor
+     * of <code>o2</code>.
+     * 
+     * @param o1 a sequence element.
+     * @param o2 a sequence element.
+     */
+    boolean predecessor(E o1, E o2);
+    
+    /**
+     * Returns <code>true</code> if <code>o1</code> is an immediate successor
+     * of <code>o2</code>.
+     * 
+     * @param o1 a sequence element.
+     * @param o2 a sequence element.
+     */
+    boolean successor(E o1, E o2);
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.BlockingQueue;
+
+import javax.jbi.messaging.MessageExchange;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.executors.Executor;
+
+/**
+ * @author Martin Krasser
+ */
+public class SequenceReader implements Runnable {
+
+    private static final Log LOG = LogFactory.getLog(SequenceReader.class);
+    
+    private static final MessageExchange STOP = createStopSignal();
+    
+    private BlockingQueue<MessageExchange> queue;
+    
+    private SequenceSender sender;
+    
+    public SequenceReader(SequenceSender sender) {
+        this.sender = sender;
+    }
+    
+    public void setQueue(BlockingQueue<MessageExchange> queue) {
+        this.queue = queue;
+    }
+
+    public void run() {
+        while (true) {
+            try {
+                // block until message exchange is available
+                MessageExchange me = queue.take();
+                if (me == STOP) {
+                    LOG.info("exit processing loop after cancellation");
+                    return;
+                }
+                // send sync to preserve message order
+                sender.sendSync(me);
+            } catch (InterruptedException e) {
+                LOG.info("exit processing loop after interrupt");
+                return;
+            } catch (Exception e) {
+                // TODO: handle sendSync errors and faults
+                LOG.error("caught and ignored exception", e);
+            }
+        }
+    }
+    
+    public void start(Executor executor) {
+        executor.execute(this);
+    }
+    
+    public void stop() throws InterruptedException {
+        queue.put(STOP);
+    }
+    
+    private static MessageExchange createStopSignal() {
+        return (MessageExchange)Proxy.newProxyInstance(SequenceReader.class.getClassLoader(), 
+                new Class[] {MessageExchange.class}, createStopHandler());
+    }
+    
+    private static InvocationHandler createStopHandler() {
+        return new InvocationHandler() {
+            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+                throw new IllegalStateException("illegal method invocation on stop signal");
+            }
+        };
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.List;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+/**
+ * A sender for synchronously sending message exchanges.
+ * 
+ * @author Martin Krasser
+ */
+public interface SequenceSender {
+
+    /**
+     * Synchronously sends a single message exchange.
+     * 
+     * @param exchange a message exchange.
+     * @throws MessagingException if a system-level error occurs.
+     */
+    void sendSync(MessageExchange exchange) throws MessagingException;
+    
+    /**
+     * Synchronously sends a list of message exchanges in the order given by the
+     * argument list.
+     * 
+     * @param exchanges a list of message exchanges.
+     * @throws MessagingException if a system-level error occurs.
+     */
+    void sendSync(List<MessageExchange> exchanges) throws MessagingException;
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * A timer task that notifies handlers about scheduled timeouts.
+ * 
+ * @see Timer
+ * @see TimerTask
+ * 
+ * @author Martin Krasser
+ */
+public class Timeout extends TimerTask {
+    
+    private List<TimeoutHandler> timeoutHandlers;
+    
+    private Timer timer;
+    
+    private long timeout;
+    
+    /**
+     * Creates a new timeout task using the given {@link Timer} instance a timeout value. The
+     * task is not scheduled immediately. It will be scheduled by calling this
+     * task's {@link #schedule()} method.
+     * 
+     * @param timer
+     * @param timeout
+     */
+    public Timeout(Timer timer, long timeout) {
+        this.timeoutHandlers = new LinkedList<TimeoutHandler>();
+        this.timeout = timeout;
+        this.timer = timer;
+    }
+
+    /**
+     * Returns the list of timeout handlers that have been registered for
+     * notification.
+     * 
+     * @return the list of timeout handlers
+     */
+    public List<TimeoutHandler> getTimeoutHandlers() {
+        return timeoutHandlers;
+    }
+    
+    /**
+     * Appends a new timeout handler at the end of the timeout handler list.
+     * 
+     * @param handler a timeout handler.
+     */
+    public void addTimeoutHandler(TimeoutHandler handler) {
+        timeoutHandlers.add(handler);
+    }
+    
+    /**
+     * inserts a new timeout handler at the beginning of the timeout handler
+     * list.
+     * 
+     * @param handler a timeout handler.
+     */
+    public void addTimeoutHandlerFirst(TimeoutHandler handler) {
+        timeoutHandlers.add(0, handler);
+    }
+    
+    /**
+     * Removes all timeout handlers from the timeout handler list. 
+     */
+    public void clearTimeoutHandlers() {
+        this.timeoutHandlers.clear();
+    }
+    
+    /**
+     * Schedules this timeout task.
+     */
+    public void schedule() {
+        timer.schedule(this, timeout);
+    }
+
+    /**
+     * Notifies all timeout handlers about the scheduled timeout.
+     */
+    @Override
+    public void run() {
+        for (TimeoutHandler observer : timeoutHandlers) {
+            observer.timeout(this);
+        }
+    }
+
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+/**
+ * Implemented by classes that handle timeout notifications.
+ * 
+ * @author Martin Krasser
+ */
+public interface TimeoutHandler {
+
+    /**
+     * Handles a timeout notification.
+     * 
+     * @param timeout the timer task that caused this timeout notification.
+     */
+    void timeout(Timeout timeout);
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.Resequencer;
+import org.apache.servicemix.eip.support.resequence.DefaultComparator;
+import org.apache.servicemix.tck.MessageList;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class ResequencerTest extends AbstractEIPTest {
+
+    private static final String RESEQUENCER_NAME = "resequencer";
+    private static final String TARGET_NAME = "target";
+    private static final String SEQNUM_KEY = "seqnum";
+    
+    private Resequencer resequencer;
+    
+    public void setUp() throws Exception {
+        super.setUp();
+        DefaultComparator comparator = new DefaultComparator();
+        comparator.setSequenceNumberAsString(false);
+        comparator.setSequenceNumberKey(SEQNUM_KEY);
+        resequencer = new Resequencer();
+        resequencer.setTarget(createServiceExchangeTarget(new QName(TARGET_NAME)));
+        resequencer.setComparator(comparator);
+        resequencer.setCapacity(100);
+        resequencer.setTimeout(500L);
+        configurePattern(resequencer);
+        activateComponent(resequencer, RESEQUENCER_NAME);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testAsync() throws Exception {
+        int numMessages = 5;
+        ReceiverComponent receiver = activateReceiver(TARGET_NAME);
+        client.send(createTestMessageExchange(4));
+        client.send(createTestMessageExchange(1));
+        client.send(createTestMessageExchange(3));
+        client.send(createTestMessageExchange(5));
+        client.send(createTestMessageExchange(2));
+        MessageList ml = receiver.getMessageList();
+        ml.waitForMessagesToArrive(numMessages);
+        assertEquals("wrong number of messages", numMessages, ml.getMessageCount());
+        for (int i = 0; i < numMessages; i++) {
+            assertSequenceProperties((NormalizedMessage)ml.getMessages().get(i), i + 1);
+        }
+        for (int i = 0; i < numMessages; i++) {
+            MessageExchange me = (InOnly)client.receive();
+            assertEquals(ExchangeStatus.DONE, me.getStatus());
+        }
+    }
+    
+    private MessageExchange createTestMessageExchange(long num) throws Exception {
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName(RESEQUENCER_NAME));
+        me.getInMessage().setProperty(SEQNUM_KEY, new Long(num));
+        return me;
+    }
+    
+    private static void assertSequenceProperties(NormalizedMessage m, long num) {
+        Long l = (Long)m.getProperty(SEQNUM_KEY);
+        assertEquals("wrong sequence number", num, l.longValue());
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.Resequencer;
+import org.apache.servicemix.eip.support.resequence.DefaultComparator;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.MessageList;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class ResequencerTxTest extends AbstractEIPTransactionalTest {
+
+    private static final String RESEQUENCER_NAME = "resequencer";
+    private static final String TARGET_NAME = "target";
+    private static final String SEQNUM_KEY = "seqnum";
+    
+    private Resequencer resequencer;
+    
+    public void setUp() throws Exception {
+        super.setUp();
+        DefaultComparator comparator = new DefaultComparator();
+        comparator.setSequenceNumberAsString(false);
+        comparator.setSequenceNumberKey(SEQNUM_KEY);
+        resequencer = new Resequencer();
+        resequencer.setTarget(createServiceExchangeTarget(new QName(TARGET_NAME)));
+        resequencer.setComparator(comparator);
+        resequencer.setCapacity(100);
+        resequencer.setTimeout(1500L);
+        configurePattern(resequencer);
+        activateComponent(resequencer, RESEQUENCER_NAME);
+    }
+
+    public void testAsyncTx() throws Exception {
+        int numMessages = 5;
+        ReceiverComponent receiver = activateReceiver(TARGET_NAME);
+        tm.begin();
+        client.send(createTestMessageExchange(4));
+        client.send(createTestMessageExchange(1));
+        client.send(createTestMessageExchange(3));
+        client.send(createTestMessageExchange(5));
+        client.send(createTestMessageExchange(2));
+        tm.commit();
+        MessageList ml = receiver.getMessageList();
+        ml.waitForMessagesToArrive(numMessages);
+        assertEquals("wrong number of messages", numMessages, ml.getMessageCount());
+        for (int i = 0; i < numMessages; i++) {
+            assertSequenceProperties((NormalizedMessage)ml.getMessages().get(i), i + 1);
+        }
+        for (int i = 0; i < numMessages; i++) {
+            MessageExchange me = (InOnly)client.receive();
+            assertEquals(ExchangeStatus.DONE, me.getStatus());
+        }
+    }
+    
+    private MessageExchange createTestMessageExchange(long num) throws Exception {
+        InOnly me = client.createInOnlyExchange();
+        me.setService(new QName(RESEQUENCER_NAME));
+        me.getInMessage().setProperty(SEQNUM_KEY, new Long(num));
+        me.getInMessage().setContent(new StringSource("<number>" + num + "</number>"));
+        return me;
+    }
+    
+    private static void assertSequenceProperties(NormalizedMessage m, long num) throws Exception {
+        Long l = (Long)m.getProperty(SEQNUM_KEY);
+        if (l == null) {
+            // get sequence number from message content
+            long n = getNumber(new SourceTransformer().toString(m.getContent()));
+            assertEquals("wrong sequence number", num, n);
+            // TODO: investigate JDK 1.6.0_01 issues here
+            // When using JDK 1.6.0_01 then messages transported to receiver
+            // conponent don't have any properties set. This is only the case
+            // if this test is running with all other EIP tests. When running
+            // alone, messages contain properties as expected. When using  
+            // JDK 1.5.0_12 messages always contain properties as expected.
+        } else {
+            // get sequence number from message properties
+            assertEquals("wrong sequence number", num, l.longValue());
+        }
+    }
+    
+    private static long getNumber(String content) {
+        int idx1 = content.indexOf("<number>") + 8;
+        int idx2 = content.indexOf("</number>");
+        return Long.parseLong(content.substring(idx1, idx2));
+    }
+
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+public class IntegerComparator implements SequenceElementComparator<Integer> {
+
+    public boolean predecessor(Integer o1, Integer o2) {
+        return o1.intValue() == (o2.intValue() - 1);
+    }
+
+    public boolean successor(Integer o1, Integer o2) {
+        return o2.intValue() == (o1.intValue() - 1);
+    }
+
+    public int compare(Integer o1, Integer o2) {
+        return o1.compareTo(o2);
+    }
+
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+public class ResequencerEngineTest extends TestCase {
+
+    private static final boolean IGNORE_LOAD_TESTS = true;
+    
+    private ResequencerEngine<Integer> resequencer;
+    
+    private LinkedBlockingQueue<Integer> queue;
+    
+    public void setUp() throws Exception {
+    }
+
+    public void tearDown() throws Exception {
+        if (resequencer != null) {
+            resequencer.stop();
+        }
+    }
+
+    public void testTimeout1() throws InterruptedException {
+        initResequencer(500, 10);
+        resequencer.put(4);
+        assertNull(queue.poll(250, TimeUnit.MILLISECONDS));
+        assertEquals((Integer)4, queue.take());
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testTimeout2() throws InterruptedException {
+        initResequencer(500, 10);
+        resequencer.setLastDelivered(2);
+        resequencer.put(4);
+        assertNull(queue.poll(250, TimeUnit.MILLISECONDS));
+        assertEquals((Integer)4, queue.take());
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testTimeout3() throws InterruptedException {
+        initResequencer(500, 10);
+        resequencer.setLastDelivered(3);
+        resequencer.put(4);
+        assertEquals((Integer)4, queue.poll(250, TimeUnit.MILLISECONDS));
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testTimout4() throws InterruptedException {
+        initResequencer(500, 10);
+        resequencer.setLastDelivered(2);
+        resequencer.put(4);
+        resequencer.put(3);
+        assertEquals((Integer)3, queue.poll(125, TimeUnit.MILLISECONDS));
+        assertEquals((Integer)4, queue.poll(125, TimeUnit.MILLISECONDS));
+        assertEquals((Integer)4, resequencer.getLastDelivered());
+    }
+    
+    public void testRandom() throws InterruptedException {
+        if (IGNORE_LOAD_TESTS) {
+            return;
+        }
+        int input = 1000;
+        initResequencer(1000, 1000);
+        List<Integer> list = new LinkedList<Integer>();
+        for (int i = 0; i < input; i++) {
+            list.add(i);
+        }
+        Random random = new Random(System.currentTimeMillis());
+        System.out.println("Input sequence:");
+        long millis = System.currentTimeMillis();
+        for (int i = input; i > 0; i--) {
+            int r = random.nextInt(i);
+            int next = list.remove(r);
+            System.out.print(next + " ");
+            resequencer.put(next); 
+        }
+        System.out.println("\nOutput sequence:");
+        for (int i = 0; i < input; i++) {
+            System.out.print(queue.take() + " ");
+        }
+        millis = System.currentTimeMillis() - millis;
+        System.out.println("\nDuration = " + millis + " ms");
+    }
+    
+    public void testReverse1() throws InterruptedException {
+        if (IGNORE_LOAD_TESTS) {
+            return;
+        }
+        testReverse(10);
+    }
+    
+    public void testReverse2() throws InterruptedException {
+        if (IGNORE_LOAD_TESTS) {
+            return;
+        }
+        testReverse(100);
+    }
+    
+    private void testReverse(int capacity) throws InterruptedException {
+        initResequencer(1, capacity);
+        for (int i = 99; i >= 0; i--) {
+            resequencer.put(i);
+        }
+        System.out.println("\nOutput sequence:");
+        for (int i = 0; i < 100; i++) {
+            System.out.print(queue.take() + " ");
+        }
+    }
+    
+    private void initResequencer(long timeout, int capacity) {
+        queue = new LinkedBlockingQueue<Integer>();
+        resequencer = new ResequencerEngine<Integer>(new IntegerComparator(), capacity);
+        resequencer.setOutQueue(queue);
+        resequencer.setTimeout(timeout);
+    }
+    
+}

Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import junit.framework.TestCase;
+
+public class SequenceTest extends TestCase {
+
+    private TestObject e1;
+    private TestObject e2;
+    private TestObject e3;
+    
+    private Sequence<TestObject> set;
+
+    public void setUp() throws Exception {
+        e1 = new TestObject(3);
+        e2 = new TestObject(4);
+        e3 = new TestObject(7);
+        set = new Sequence<TestObject>(new TestComparator());
+        set.add(e3);
+        set.add(e1);
+        set.add(e2);
+    }
+
+    public void tearDown() throws Exception {
+    }
+
+    public void testPredecessor() {
+        assertEquals(e1, set.predecessor(e2));
+        assertEquals(null, set.predecessor(e1));
+        assertEquals(null, set.predecessor(e3));
+    }
+
+    public void testSuccessor() {
+        assertEquals(e2, set.successor(e1));
+        assertEquals(null, set.successor(e2));
+        assertEquals(null, set.successor(e3));
+    }
+
+}