You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/08/01 07:03:19 UTC
svn commit: r561660 [1/2] - in /incubator/servicemix/trunk:
core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/
core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/
deployables/serviceengines/servicemix-eip/src/main/java/or...
Author: gnodet
Date: Tue Jul 31 22:03:17 2007
New Revision: 561660
URL: http://svn.apache.org/viewvc?view=rev&rev=561660
Log:
SM-1008: Reseqencer pattern for EIP component. Patch provided by Martin Krasser, thx\!
Added:
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java
incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/TestComparator.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/TestComparatorTest.java
incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/TestObject.java
Added: incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/MessageCopier.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.ByteArrayDataSource;
+import org.apache.servicemix.jbi.util.FileUtil;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+/**
+ * A thread-safe copier for NormalizedMessage onjects.
+ *
+ * @author Martin Krasser
+ */
+public class MessageCopier {
+
+ private boolean copySubject;
+ private boolean copyContent;
+ private boolean copyProperties;
+ private boolean copyAttachments;
+
+ /**
+ * Creates a new message copier instance that creates full (deep) message
+ * copies.
+ */
+ public MessageCopier() {
+ this(true, true, true, true);
+ }
+
+ /**
+ * Create a new message copier instance.
+ *
+ * @param copySubject <code>true</code> if subject shall be copied.
+ * @param copyContent <code>true</code> if content shall be copied
+ * @param copyProperties <code>true</code> if properties shall be copied
+ * @param copyAttachments <code>true</code> if attachments shall be copied
+ */
+ public MessageCopier(boolean copySubject, boolean copyContent, boolean copyProperties, boolean copyAttachments) {
+ super();
+ this.copySubject = copySubject;
+ this.copyContent = copyContent;
+ this.copyProperties = copyProperties;
+ this.copyAttachments = copyAttachments;
+ }
+
+ /**
+ * Copies messages under consideration of the <code>copySubject</code>,
+ * <code>copyContent</code>, <code>copyProperties</code>,
+ * <code>copyAttachments</code> properties.
+ *
+ * @param message original message.
+ * @return a copy of the original message.
+ * @throws MessagingException if a system-level exception occurs.
+ */
+ public NormalizedMessage copy(NormalizedMessage message) throws MessagingException {
+ NormalizedMessage copy = new MessageUtil.NormalizedMessageImpl();
+ if (copySubject) {
+ copySubject(message, copy);
+ }
+ if (copyContent) {
+ copyContent(message, copy);
+ }
+ if (copyProperties) {
+ copyProperties(message, copy);
+ }
+ if (copyAttachments) {
+ copyAttachments(message, copy);
+ }
+ return copy;
+ }
+
+ public boolean isCopyAttachments() {
+ return copyAttachments;
+ }
+
+ public boolean isCopyContent() {
+ return copyContent;
+ }
+
+ public boolean isCopyProperties() {
+ return copyProperties;
+ }
+
+ public boolean isCopySubject() {
+ return copySubject;
+ }
+
+ private static void copySubject(NormalizedMessage from, NormalizedMessage to) {
+ to.setSecuritySubject(from.getSecuritySubject());
+ }
+
+ private static void copyContent(NormalizedMessage from, NormalizedMessage to) throws MessagingException{
+ String str = null;
+ try {
+ str = new SourceTransformer().toString(from.getContent());
+ } catch (Exception e) {
+ throw new MessagingException(e);
+ }
+ if (str != null) {
+ to.setContent(new StringSource(str));
+ }
+ }
+
+ private static void copyProperties(NormalizedMessage from, NormalizedMessage to) {
+ for (Object name : from.getPropertyNames()) {
+ to.setProperty((String)name, from.getProperty((String)name));
+ }
+ }
+
+ private static void copyAttachments(NormalizedMessage from, NormalizedMessage to) throws MessagingException{
+ for (Object name : from.getAttachmentNames()) {
+ DataHandler handler = from.getAttachment((String)name);
+ DataSource source = handler.getDataSource();
+ if (source instanceof ByteArrayDataSource == false) {
+ DataSource copy = copyDataSource(source);
+ handler = new DataHandler(copy);
+ }
+ to.addAttachment((String)name, handler);
+ }
+ }
+
+ private static DataSource copyDataSource(DataSource source) throws MessagingException {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ FileUtil.copyInputStream(source.getInputStream(), baos);
+ ByteArrayDataSource bads = new ByteArrayDataSource(baos.toByteArray(), source.getContentType());
+ bads.setName(source.getName());
+ return bads;
+ } catch (IOException e) {
+ throw new MessagingException(e);
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java (added)
+++ incubator/servicemix/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/util/MessageCopierTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.util;
+
+import javax.activation.DataHandler;
+import javax.activation.DataSource;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.security.auth.Subject;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.util.ByteArrayDataSource;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+public class MessageCopierTest extends TestCase {
+
+ private NormalizedMessage message;
+
+ private Subject subject;
+
+ public void setUp() throws Exception {
+ subject = new Subject();
+ message = new MessageUtil.NormalizedMessageImpl();
+ message.setContent(new StringSource("<doc>s1<doc>"));
+ message.addAttachment("a", new DataHandler(createDataSource("s2")));
+ message.setProperty("p", "s3");
+ message.setSecuritySubject(subject);
+ }
+
+ public void tearDown() throws Exception {
+ }
+
+ public void testCopyContent() throws Exception {
+ MessageCopier copier = new MessageCopier(false, true, false, false);
+ NormalizedMessage copy = copier.copy(message);
+ String content = new SourceTransformer().toString(copy.getContent());
+ assertEquals("wrong content", "<doc>s1<doc>", content);
+ assertEquals("wrong attachment", null, copy.getAttachment("a"));
+ assertEquals("wrong property", null, copy.getProperty("p"));
+ assertEquals("wrong subject", null, copy.getSecuritySubject());
+ }
+
+ public void testCopyAttachment() throws Exception {
+ MessageCopier copier = new MessageCopier(false, false, false, true);
+ NormalizedMessage copy = copier.copy(message);
+ String attachment = IOUtils.toString(copy.getAttachment("a").getInputStream());
+ assertEquals("wrong content", null, copy.getContent());
+ assertEquals("wrong attachment", "s2", attachment);
+ assertEquals("wrong property", null, copy.getProperty("p"));
+ assertEquals("wrong subject", null, copy.getSecuritySubject());
+ }
+
+ public void testCopyProperties() throws Exception {
+ MessageCopier copier = new MessageCopier(false, false, true, false);
+ NormalizedMessage copy = copier.copy(message);
+ assertEquals("wrong content", null, copy.getContent());
+ assertEquals("wrong attachment", null, copy.getAttachment("a"));
+ assertEquals("wrong property", "s3", copy.getProperty("p"));
+ assertEquals("wrong subject", null, copy.getSecuritySubject());
+ }
+
+ public void testCopySubject() throws Exception {
+ MessageCopier copier = new MessageCopier(true, false, false, false);
+ NormalizedMessage copy = copier.copy(message);
+ assertEquals("wrong content", null, copy.getContent());
+ assertEquals("wrong attachment", null, copy.getAttachment("a"));
+ assertEquals("wrong property", null, copy.getProperty("p"));
+ assertEquals("wrong subject", subject, copy.getSecuritySubject());
+ }
+
+ private static DataSource createDataSource(String text) {
+ return new ByteArrayDataSource(text.getBytes(), "text/plain");
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/patterns/Resequencer.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.patterns;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+
+import org.apache.servicemix.eip.support.resequence.DefaultComparator;
+import org.apache.servicemix.eip.support.resequence.ResequencerBase;
+import org.apache.servicemix.eip.support.resequence.ResequencerEngine;
+import org.apache.servicemix.eip.support.resequence.SequenceElementComparator;
+import org.apache.servicemix.eip.support.resequence.SequenceReader;
+import org.apache.servicemix.eip.support.resequence.SequenceSender;
+import org.apache.servicemix.executors.Executor;
+
+/**
+ * @author Martin Krasser
+ *
+ * @org.apache.xbean.XBean element="resequencer"
+ */
+public class Resequencer extends ResequencerBase implements SequenceSender {
+
+ private ResequencerEngine<MessageExchange> reseq;
+
+ private SequenceReader reader;
+
+ private Executor executor;
+
+ private int capacity;
+
+ private long timeout;
+
+ private SequenceElementComparator<MessageExchange> comparator;
+
+ public Resequencer() {
+ this.reader = new SequenceReader(this);
+ this.comparator = new DefaultComparator();
+ }
+
+ public void setCapacity(int capacity) {
+ this.capacity = capacity;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ public void setComparator(SequenceElementComparator<MessageExchange> comparator) {
+ this.comparator = comparator;
+ }
+
+ @Override
+ public void start() throws Exception {
+ super.start();
+ if (executor == null) {
+ executor = getServiceUnit().getComponent().getExecutor();
+ }
+ BlockingQueue<MessageExchange> queue = new LinkedBlockingQueue<MessageExchange>();
+ reseq = new ResequencerEngine<MessageExchange>(comparator, capacity);
+ reseq.setTimeout(timeout);
+ reseq.setOutQueue(queue);
+ reader.setQueue(queue);
+ reader.start(executor);
+ }
+
+ @Override
+ public void stop() throws Exception {
+ reseq.stop();
+ reader.stop();
+ super.stop();
+ }
+
+ public void sendSync(MessageExchange exchange) throws MessagingException {
+ super.sendSync(exchange);
+ }
+
+ public void sendSync(List<MessageExchange> exchanges) throws MessagingException {
+ for (MessageExchange exchange : exchanges) {
+ sendSync(exchange);
+ }
+ }
+
+ @Override
+ protected void processSync(MessageExchange exchange) throws Exception {
+ fail(exchange, new UnsupportedOperationException("synchronous resequencing not supported"));
+ }
+
+ @Override
+ protected void processAsync(MessageExchange exchange) throws Exception {
+ validateMessageExchange(exchange);
+ if (exchange.getStatus() == ExchangeStatus.DONE) {
+ return;
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ return;
+ } else if (exchange.getFault() != null) {
+ done(exchange);
+ return;
+ }
+ processMessage(exchange);
+ done(exchange);
+ }
+
+ private void processMessage(MessageExchange sourceExchange) throws MessagingException, InterruptedException {
+ NormalizedMessage source = sourceExchange.getMessage("in");
+ NormalizedMessage copy = getMessageCopier().copy(source);
+ MessageExchange targetExchange = createTargetExchange(copy, sourceExchange.getPattern());
+ // add target exchange to resequencer (blocking if capacity is reached)
+ reseq.put(targetExchange);
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/DefaultComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import javax.jbi.messaging.MessageExchange;
+
+/**
+ * Compares {@link MessageExchange} sequence elements based on sequence numbers
+ * defined by their in-{@link NormalizedMessage}s. This comparator works on
+ * sequence numbers of type {@link Long}. Sequence numbers must be stored as
+ * {@link NormalizedMessage} properties. The property name under which the
+ * sequence number is stored is configured via this comparator's
+ * <code>sequenceNumberKey</code> property.
+ *
+ * @author Martin Krasser
+ *
+ * @org.apache.xbean.XBean element="default-comparator"
+ */
+public class DefaultComparator implements SequenceElementComparator<MessageExchange> {
+
+ public static final String SEQUENCE_NUMBER_KEY = "org.apache.servicemix.eip.sequence.number";
+
+ private static final String IN = "in";
+
+ private String sequenceNumberKey;
+
+ private boolean sequenceNumberAsString;
+
+ public DefaultComparator() {
+ sequenceNumberKey = SEQUENCE_NUMBER_KEY;
+ sequenceNumberAsString = false;
+ }
+
+ public String getSequenceNumberKey() {
+ return sequenceNumberKey;
+ }
+
+ public void setSequenceNumberKey(String sequenceNumberPropertyName) {
+ this.sequenceNumberKey = sequenceNumberPropertyName;
+ }
+
+ public boolean isSequenceNumberAsString() {
+ return sequenceNumberAsString;
+ }
+
+ public void setSequenceNumberAsString(boolean sequenceNumberAsString) {
+ this.sequenceNumberAsString = sequenceNumberAsString;
+ }
+
+ public boolean predecessor(MessageExchange o1, MessageExchange o2) {
+ long n1 = getSequenceNumber(o1).longValue();
+ long n2 = getSequenceNumber(o2).longValue();
+ return n1 == (n2 - 1L);
+ }
+
+ public boolean successor(MessageExchange o1, MessageExchange o2) {
+ long n1 = getSequenceNumber(o1).longValue();
+ long n2 = getSequenceNumber(o2).longValue();
+ return n2 == (n1 - 1L);
+ }
+
+ public int compare(MessageExchange o1, MessageExchange o2) {
+ Long n1 = getSequenceNumber(o1);
+ Long n2 = getSequenceNumber(o2);
+ return n1.compareTo(n2);
+ }
+
+ private Long getSequenceNumber(MessageExchange exchange) {
+ Object number = exchange.getMessage(IN).getProperty(sequenceNumberKey);
+ if (sequenceNumberAsString) {
+ return new Long((String)number);
+ } else {
+ return (Long)number;
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Element.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+/**
+ * A container for objects to be resequenced. This container can be scheduled
+ * for timing out. Non-scheduled objects or already timed-out objects are ready
+ * for being released by the {@link ResequencerEngine}.
+ *
+ * @author Martin Krasser
+ */
+class Element<E> implements TimeoutHandler {
+
+ /**
+ * The contained object.
+ */
+ private E object;
+
+ /**
+ * Not <code>null</code> if this element is currently beeing scheduled for
+ * timing out.
+ */
+ private Timeout timeout;
+
+ /**
+ * Creates a new container instance.
+ *
+ * @param object contained object.
+ */
+ public Element(E object) {
+ this.object = object;
+ }
+
+ /**
+ * Returns the contained object.
+ *
+ * @return the contained object.
+ */
+ public E getObject() {
+ return object;
+ }
+
+ /**
+ * Returns <code>true</code> if this element is currently scheduled for
+ * timing out.
+ *
+ * @return <code>true</code> if scheduled or <code>false</code> if not
+ * scheduled or already timed-out.
+ */
+ public synchronized boolean scheduled() {
+ return timeout != null;
+ }
+
+ /**
+ * Schedules the given timeout task. Before this methods calls the
+ * {@link Timeout#schedule()} method it adds this element as timeout
+ * listener.
+ *
+ * @param t a timeout task.
+ */
+ public synchronized void schedule(Timeout t) {
+ this.timeout = t;
+ this.timeout.addTimeoutHandlerFirst(this);
+ this.timeout.schedule();
+ }
+
+ /**
+ * Cancels the scheduled timeout for this element. If this element is not
+ * scheduled or has already timed-out this method has no effect.
+ */
+ public synchronized void cancel() {
+ if (timeout != null) {
+ timeout.cancel();
+ }
+ timeout(null);
+ }
+
+ /**
+ * Marks this element as timed-out.
+ *
+ * @param t timeout task that caused the notification.
+ */
+ public synchronized void timeout(Timeout t) {
+ this.timeout = null;
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ElementComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+/**
+ * A strategy for comparing {@link Element} instances. This strategy uses
+ * another {@link SequenceElementComparator} instance for comparing elements
+ * contained by {@link Element} instances.
+ *
+ * @author Martin Krasser
+ */
+class ElementComparator<E> implements SequenceElementComparator<Element<E>> {
+
+ /**
+ * A sequence element comparator this comparator delegates to.
+ */
+ private SequenceElementComparator<E> comparator;
+
+ /**
+ * Creates a new element comparator instance.
+ *
+ * @param comparator a sequence element comparator this comparator delegates
+ * to.
+ */
+ public ElementComparator(SequenceElementComparator<E> comparator) {
+ this.comparator = comparator;
+ }
+
+ /**
+ * @see com.icw.ehf.integration.common.collection.SequenceElementComparator#predecessor(java.lang.Object, java.lang.Object)
+ */
+ public boolean predecessor(Element<E> o1, Element<E> o2) {
+ return comparator.predecessor(o1.getObject(), o2.getObject());
+ }
+
+ /**
+ * @see com.icw.ehf.integration.common.collection.SequenceElementComparator#successor(java.lang.Object, java.lang.Object)
+ */
+ public boolean successor(Element<E> o1, Element<E> o2) {
+ return comparator.successor(o1.getObject(), o2.getObject());
+ }
+
+ /**
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
+ public int compare(Element<E> o1, Element<E> o2) {
+ return comparator.compare(o1.getObject(), o2.getObject());
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerBase.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.net.URI;
+
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.messaging.RobustInOnly;
+
+import org.apache.servicemix.eip.EIPEndpoint;
+import org.apache.servicemix.eip.support.ExchangeTarget;
+import org.apache.servicemix.jbi.util.MessageCopier;
+import org.apache.servicemix.jbi.util.MessageUtil;
+
+public abstract class ResequencerBase extends EIPEndpoint {
+
+ private MessageCopier messageCopier = new MessageCopier();
+
+ private ExchangeTarget target;
+
+ public MessageCopier getMessageCopier() {
+ return messageCopier;
+ }
+
+ public ExchangeTarget getTarget() {
+ return target;
+ }
+
+ public void setTarget(ExchangeTarget target) {
+ this.target = target;
+ }
+
+ @Override
+ public void validate() throws DeploymentException {
+ super.validate();
+ if (target == null) {
+ throw new IllegalArgumentException("target must be set to a valid ExchangeTarget");
+ }
+ }
+
+ public void validateMessageExchange(MessageExchange exchange) throws MessagingException {
+ if ((exchange instanceof InOnly) || (exchange instanceof RobustInOnly)) {
+ return;
+ }
+ fail(exchange, new UnsupportedOperationException("Use an InOnly or RobustInOnly MEP"));
+ }
+
+ protected MessageExchange createTargetExchange(NormalizedMessage message, URI exchangePattern) throws MessagingException {
+ MessageExchange targetExchange = getExchangeFactory().createExchange(exchangePattern);
+ target.configureTarget(targetExchange, getContext());
+ MessageUtil.transferToIn(message, targetExchange);
+ return targetExchange;
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/ResequencerEngine.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.Queue;
+import java.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Resequences elements based on a given {@link SequenceElementComparator}.
+ * This resequencer is designed for resequencing element streams. Resequenced
+ * elements are added to an output {@link Queue}. The resequencer is configured
+ * via the <code>timeout</code> and <code>capacity</code> properties.
+ *
+ * <ul>
+ * <li><code>timeout</code>. Defines the timeout (in milliseconds) for a
+ * given element managed by this resequencer. An out-of-sequence element can
+ * only be marked as <i>ready-for-delivery</i> if it either times out or if it
+ * has an immediate predecessor (in that case it is in-sequence). If an
+ * immediate predecessor of a waiting element arrives the timeout task for the
+ * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
+ * <p>
+ * If the maximum out-of-sequence time between elements within a stream is
+ * known, the <code>timeout</code> value should be set to this value. In this
+ * case it is guaranteed that all elements of a stream will be delivered in
+ * sequence to the output queue. However, large <code>timeout</code> values
+ * might require a very high resequencer <code>capacity</code> which might be
+ * in conflict with available memory resources. The lower the
+ * <code>timeout</code> value is compared to the out-of-sequence time between
+ * elements within a stream the higher the probability is for out-of-sequence
+ * elements delivered by this resequencer.</li>
+ * <li><code>capacity</code>. The capacity of this resequencer.</li>
+ * </ul>
+ *
+ * Whenever a timeout for a certain element occurs or an element has been added
+ * to this resequencer a delivery attempt is started. If a (sub)sequence of
+ * elements is <i>ready-for-delivery</i> then they are added to output queue.
+ * <p>
+ * The resequencer remembers the last-delivered element. If an element arrives
+ * which is the immediate successor of the last-delivered element it will be
+ * delivered immediately and the last-delivered element is adjusted accordingly.
+ * If the last-delivered element is <code>null</code> i.e. the resequencer was
+ * newly created the first arriving element will wait <code>timeout</code>
+ * milliseconds for being delivered to the output queue.
+ *
+ * @author Martin Krasser
+ */
+public class ResequencerEngine<E> implements TimeoutHandler {
+
+ private static final Log LOG = LogFactory.getLog(ResequencerEngine.class);
+
+ private long timeout;
+
+ private int capacity;
+
+ private Queue<E> outQueue;
+
+ private Element<E> lastDelivered;
+
+ /**
+ * A sequence of elements for sorting purposes.
+ */
+ private Sequence<Element<E>> sequence;
+
+ /**
+ * A timer for scheduling timeout notifications.
+ */
+ private Timer timer;
+
+ /**
+ * Creates a new resequencer instance with a default timeout of 2000
+ * milliseconds. The capacity is set to {@link Integer#MAX_VALUE}.
+ *
+ * @param comparator a sequence element comparator.
+ */
+ public ResequencerEngine(SequenceElementComparator<E> comparator) {
+ this(comparator, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Creates a new resequencer instance with a default timeout of 2000
+ * milliseconds.
+ *
+ * @param comparator a sequence element comparator.
+ * @param capacity the capacity of this resequencer.
+ */
+ public ResequencerEngine(SequenceElementComparator<E> comparator, int capacity) {
+ this.timer = new Timer("Resequencer Timer");
+ this.sequence = createSequence(comparator);
+ this.capacity = capacity;
+ this.timeout = 2000L;
+ this.lastDelivered = null;
+ }
+
+ /**
+ * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
+ */
+ public void stop() {
+ this.timer.cancel();
+ }
+
+ /**
+ * Returns the output queue.
+ *
+ * @return the output queue.
+ */
+ public Queue<E> getOutQueue() {
+ return outQueue;
+ }
+
+ /**
+ * Sets the output queue.
+ *
+ * @param outQueue output queue.
+ */
+ public void setOutQueue(Queue<E> outQueue) {
+ this.outQueue = outQueue;
+ }
+
+ /**
+ * Returns this resequencer's timeout value.
+ *
+ * @return the timeout in milliseconds.
+ */
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Sets this sequencer's timeout value.
+ *
+ * @param timeout the timeout in milliseconds.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
+ * Handles a timeout notification by starting a delivery attempt.
+ *
+ * @param timout timeout task that caused the notification.
+ */
+ public synchronized void timeout(Timeout timout) {
+ try {
+ while (deliver()) {
+ // work done in deliver()
+ }
+ } catch (RuntimeException e) {
+ LOG.error("error during delivery", e);
+ }
+ }
+
+ /**
+ * Adds an element to this resequencer throwing an exception if the maximum
+ * capacity is reached.
+ *
+ * @param o element to be resequenced.
+ * @throws IllegalStateException if the element cannot be added at this time
+ * due to capacity restrictions.
+ */
+ public synchronized void add(E o) {
+ if (sequence.size() >= capacity) {
+ throw new IllegalStateException("maximum capacity is reached");
+ }
+ insert(o);
+ }
+
+ /**
+ * Adds an element to this resequencer waiting, if necessary, until capacity
+ * becomes available.
+ *
+ * @param o element to be resequenced.
+ * @throws InterruptedException if interrupted while waiting.
+ */
+ public synchronized void put(E o) throws InterruptedException {
+ if (sequence.size() >= capacity) {
+ wait();
+ }
+ insert(o);
+ }
+
+ /**
+ * Returns the last delivered element.
+ *
+ * @return the last delivered element or <code>null</code> if no delivery
+ * has been made yet.
+ */
+ E getLastDelivered() {
+ if (lastDelivered == null) {
+ return null;
+ }
+ return lastDelivered.getObject();
+ }
+
+ /**
+ * Sets the last delivered element. This is for testing purposes only.
+ *
+ * @param o an element.
+ */
+ void setLastDelivered(E o) {
+ lastDelivered = new Element<E>(o);
+ }
+
+ /**
+ * Inserts the given element into this resequencing queue (sequence). If the
+ * element is not ready for immediate delivery and has no immediate
+ * presecessor then it is scheduled for timing out. After being timed out it
+ * is ready for delivery.
+ *
+ * @param o an element.
+ */
+ private void insert(E o) {
+ // wrap object into internal element
+ Element<E> element = new Element<E>(o);
+ // add element to sequence in proper order
+ sequence.add(element);
+
+ Element<E> successor = sequence.successor(element);
+
+ // check if there is an immediate successor and cancel
+ // timer task (no need to wait any more for timeout)
+ if (successor != null) {
+ successor.cancel();
+ }
+
+ // start delivery if current element is successor of last delivered element
+ if (successorOfLastDelivered(element)) {
+ // nothing to schedule
+ } else if (sequence.predecessor(element) != null) {
+ // nothing to schedule
+ } else {
+ Timeout t = defineTimeout();
+ element.schedule(t);
+ }
+
+ // start delivery
+ while (deliver()) {
+ // work done in deliver()
+ }
+ }
+
+ /**
+ * Attempts to deliver a single element from the head of the resequencer
+ * queue (sequence). Only elements which have not been scheduled for timing
+ * out or which already timed out can be delivered.
+ *
+ * @return <code>true</code> if the element has been delivered
+ * <code>false</code> otherwise.
+ */
+ private boolean deliver() {
+ if (sequence.size() == 0) {
+ return false;
+ }
+ // inspect element with lowest sequence value
+ Element<E> element = sequence.first();
+
+ // if element is scheduled do not deliver and return
+ if (element.scheduled()) {
+ return false;
+ }
+
+ // remove deliverable element from sequence
+ sequence.remove(element);
+
+ // set the delivered element to last delivered element
+ lastDelivered = element;
+
+ // notify a waiting thread that capacity is available
+ notify();
+
+ // add element to output queue
+ outQueue.add(element.getObject());
+
+ // element has been delivered
+ return true;
+ }
+
+ /**
+ * Returns <code>true</code> if the given element is the immediate
+ * successor of the last delivered element.
+ *
+ * @param element an element.
+ * @return <code>true</code> if the given element is the immediate
+ * successor of the last delivered element.
+ */
+ private boolean successorOfLastDelivered(Element<E> element) {
+ if (lastDelivered == null) {
+ return false;
+ }
+ if (sequence.comparator().successor(element, lastDelivered)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Creates a timeout task based on the timeout setting of this resequencer.
+ *
+ * @return a new timeout task.
+ */
+ private Timeout defineTimeout() {
+ Timeout result = new Timeout(timer, timeout);
+ result.addTimeoutHandler(this);
+ return result;
+ }
+
+ private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
+ return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Sequence.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.TreeSet;
+
+/**
+ * A sorted set of elements with additional methods for obtaining immediate
+ * successors and immediate predecessors of a given element in the sequence.
+ * Successors and predecessors are calculated by using a
+ * {@link SequenceElementComparator}.
+ *
+ * @author Martin Krasser
+ */
+public class Sequence<E> extends TreeSet<E> {
+
+ private static final long serialVersionUID = 5647393631147741711L;
+
+ private SequenceElementComparator<E> comparator;
+
+ /**
+ * Creates a new {@link Sequence} instance.
+ *
+ * @param comparator a strategy for comparing elements of this sequence.
+ */
+ public Sequence(SequenceElementComparator<E> comparator) {
+ super(comparator);
+ this.comparator = comparator;
+ }
+
+ /**
+ * Returns the immediate predecessor of the given element in this sequence
+ * or <code>null</code> if no predecessor exists.
+ *
+ * @param e an element which is compared to elements of this sequence.
+ * @return an element of this sequence or <code>null</code>.
+ */
+ public E predecessor(E e) {
+ E elem = lower(e);
+ if (elem == null) {
+ return null;
+ }
+ if (comparator.predecessor(elem, e)) {
+ return elem;
+ }
+ return null;
+ }
+
+ /**
+ * Returns the immediate successor of the given element in this sequence
+ * or <code>null</code> if no successor exists.
+ *
+ * @param e an element which is compared to elements of this sequence.
+ * @return an element of this sequence or <code>null</code>.
+ */
+ public E successor(E e) {
+ E elem = higher(e);
+ if (elem == null) {
+ return null;
+ }
+ if (comparator.successor(elem, e)) {
+ return elem;
+ }
+ return null;
+ }
+
+ /**
+ * Returns this sequence's comparator.
+ *
+ * @return this sequence's comparator.
+ */
+ public SequenceElementComparator<E> comparator() {
+ return comparator;
+ }
+
+ /**
+ * Returns the next higher element in the sequence to the given element. If
+ * the given element doesn't exist or if it is the last element in the
+ * sequence <code>null</code> is returned. <strong>Please note that this
+ * method is provided for compatibility with Java 5 SE. On a Java 6 SE
+ * platform the same method implemented by the {@link TreeSet}
+ * class should be used for better performance.</strong>
+ *
+ * @param e an element which is compared to elements of this sequence.
+ * @return an element of this sequence or <code>null</code>.
+ */
+ public E higher(E e) {
+ boolean found = false;
+ for (E current : this) {
+ if (found) {
+ return current;
+ }
+ if (comparator.compare(e, current) == 0) {
+ found = true;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns the next lower element in the sequence to the given element. If
+ * the given element doesn't exist or if it is the first element in the
+ * sequence <code>null</code> is returned. <strong>Please note that this
+ * method is provided for compatibility with Java 5 SE. On a Java 6 SE
+ * platform the same method implemented by the {@link TreeSet}
+ * class should be used for better performance.</strong>
+ *
+ * @param e an element which is compared to elements of this sequence.
+ * @return an element of this sequence or <code>null</code>.
+ */
+ public E lower(E e) {
+ E last = null;
+ for (E current : this) {
+ if (comparator.compare(e, current) == 0) {
+ return last;
+ }
+ last = current;
+ }
+ return last;
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceElementComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.Comparator;
+
+/**
+ * A strategy for comparing elements of a sequence.
+ *
+ * @author Martin Krasser
+ */
+public interface SequenceElementComparator<E> extends Comparator<E> {
+
+ /**
+ * Returns <code>true</code> if <code>o1</code> is an immediate predecessor
+ * of <code>o2</code>.
+ *
+ * @param o1 a sequence element.
+ * @param o2 a sequence element.
+ */
+ boolean predecessor(E o1, E o2);
+
+ /**
+ * Returns <code>true</code> if <code>o1</code> is an immediate successor
+ * of <code>o2</code>.
+ *
+ * @param o1 a sequence element.
+ * @param o2 a sequence element.
+ */
+ boolean successor(E o1, E o2);
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceReader.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.BlockingQueue;
+
+import javax.jbi.messaging.MessageExchange;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.executors.Executor;
+
+/**
+ * @author Martin Krasser
+ */
+public class SequenceReader implements Runnable {
+
+ private static final Log LOG = LogFactory.getLog(SequenceReader.class);
+
+ private static final MessageExchange STOP = createStopSignal();
+
+ private BlockingQueue<MessageExchange> queue;
+
+ private SequenceSender sender;
+
+ public SequenceReader(SequenceSender sender) {
+ this.sender = sender;
+ }
+
+ public void setQueue(BlockingQueue<MessageExchange> queue) {
+ this.queue = queue;
+ }
+
+ public void run() {
+ while (true) {
+ try {
+ // block until message exchange is available
+ MessageExchange me = queue.take();
+ if (me == STOP) {
+ LOG.info("exit processing loop after cancellation");
+ return;
+ }
+ // send sync to preserve message order
+ sender.sendSync(me);
+ } catch (InterruptedException e) {
+ LOG.info("exit processing loop after interrupt");
+ return;
+ } catch (Exception e) {
+ // TODO: handle sendSync errors and faults
+ LOG.error("caught and ignored exception", e);
+ }
+ }
+ }
+
+ public void start(Executor executor) {
+ executor.execute(this);
+ }
+
+ public void stop() throws InterruptedException {
+ queue.put(STOP);
+ }
+
+ private static MessageExchange createStopSignal() {
+ return (MessageExchange)Proxy.newProxyInstance(SequenceReader.class.getClassLoader(),
+ new Class[] {MessageExchange.class}, createStopHandler());
+ }
+
+ private static InvocationHandler createStopHandler() {
+ return new InvocationHandler() {
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ throw new IllegalStateException("illegal method invocation on stop signal");
+ }
+ };
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/SequenceSender.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.List;
+
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+
+/**
+ * A sender for synchronously sending message exchanges.
+ *
+ * @author Martin Krasser
+ */
+public interface SequenceSender {
+
+ /**
+ * Synchronously sends a single message exchange.
+ *
+ * @param exchange a message exchange.
+ * @throws MessagingException if a system-level error occurs.
+ */
+ void sendSync(MessageExchange exchange) throws MessagingException;
+
+ /**
+ * Synchronously sends a list of message exchanges in the order given by the
+ * argument list.
+ *
+ * @param exchanges a list of message exchanges.
+ * @throws MessagingException if a system-level error occurs.
+ */
+ void sendSync(List<MessageExchange> exchanges) throws MessagingException;
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/Timeout.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * A timer task that notifies handlers about scheduled timeouts.
+ *
+ * @see Timer
+ * @see TimerTask
+ *
+ * @author Martin Krasser
+ */
+public class Timeout extends TimerTask {
+
+ private List<TimeoutHandler> timeoutHandlers;
+
+ private Timer timer;
+
+ private long timeout;
+
+ /**
+ * Creates a new timeout task using the given {@link Timer} instance a timeout value. The
+ * task is not scheduled immediately. It will be scheduled by calling this
+ * task's {@link #schedule()} method.
+ *
+ * @param timer
+ * @param timeout
+ */
+ public Timeout(Timer timer, long timeout) {
+ this.timeoutHandlers = new LinkedList<TimeoutHandler>();
+ this.timeout = timeout;
+ this.timer = timer;
+ }
+
+ /**
+ * Returns the list of timeout handlers that have been registered for
+ * notification.
+ *
+ * @return the list of timeout handlers
+ */
+ public List<TimeoutHandler> getTimeoutHandlers() {
+ return timeoutHandlers;
+ }
+
+ /**
+ * Appends a new timeout handler at the end of the timeout handler list.
+ *
+ * @param handler a timeout handler.
+ */
+ public void addTimeoutHandler(TimeoutHandler handler) {
+ timeoutHandlers.add(handler);
+ }
+
+ /**
+ * inserts a new timeout handler at the beginning of the timeout handler
+ * list.
+ *
+ * @param handler a timeout handler.
+ */
+ public void addTimeoutHandlerFirst(TimeoutHandler handler) {
+ timeoutHandlers.add(0, handler);
+ }
+
+ /**
+ * Removes all timeout handlers from the timeout handler list.
+ */
+ public void clearTimeoutHandlers() {
+ this.timeoutHandlers.clear();
+ }
+
+ /**
+ * Schedules this timeout task.
+ */
+ public void schedule() {
+ timer.schedule(this, timeout);
+ }
+
+ /**
+ * Notifies all timeout handlers about the scheduled timeout.
+ */
+ @Override
+ public void run() {
+ for (TimeoutHandler observer : timeoutHandlers) {
+ observer.timeout(this);
+ }
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/main/java/org/apache/servicemix/eip/support/resequence/TimeoutHandler.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+/**
+ * Implemented by classes that handle timeout notifications.
+ *
+ * @author Martin Krasser
+ */
+public interface TimeoutHandler {
+
+ /**
+ * Handles a timeout notification.
+ *
+ * @param timeout the timer task that caused this timeout notification.
+ */
+ void timeout(Timeout timeout);
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.Resequencer;
+import org.apache.servicemix.eip.support.resequence.DefaultComparator;
+import org.apache.servicemix.tck.MessageList;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class ResequencerTest extends AbstractEIPTest {
+
+ private static final String RESEQUENCER_NAME = "resequencer";
+ private static final String TARGET_NAME = "target";
+ private static final String SEQNUM_KEY = "seqnum";
+
+ private Resequencer resequencer;
+
+ public void setUp() throws Exception {
+ super.setUp();
+ DefaultComparator comparator = new DefaultComparator();
+ comparator.setSequenceNumberAsString(false);
+ comparator.setSequenceNumberKey(SEQNUM_KEY);
+ resequencer = new Resequencer();
+ resequencer.setTarget(createServiceExchangeTarget(new QName(TARGET_NAME)));
+ resequencer.setComparator(comparator);
+ resequencer.setCapacity(100);
+ resequencer.setTimeout(500L);
+ configurePattern(resequencer);
+ activateComponent(resequencer, RESEQUENCER_NAME);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testAsync() throws Exception {
+ int numMessages = 5;
+ ReceiverComponent receiver = activateReceiver(TARGET_NAME);
+ client.send(createTestMessageExchange(4));
+ client.send(createTestMessageExchange(1));
+ client.send(createTestMessageExchange(3));
+ client.send(createTestMessageExchange(5));
+ client.send(createTestMessageExchange(2));
+ MessageList ml = receiver.getMessageList();
+ ml.waitForMessagesToArrive(numMessages);
+ assertEquals("wrong number of messages", numMessages, ml.getMessageCount());
+ for (int i = 0; i < numMessages; i++) {
+ assertSequenceProperties((NormalizedMessage)ml.getMessages().get(i), i + 1);
+ }
+ for (int i = 0; i < numMessages; i++) {
+ MessageExchange me = (InOnly)client.receive();
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ }
+ }
+
+ private MessageExchange createTestMessageExchange(long num) throws Exception {
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName(RESEQUENCER_NAME));
+ me.getInMessage().setProperty(SEQNUM_KEY, new Long(num));
+ return me;
+ }
+
+ private static void assertSequenceProperties(NormalizedMessage m, long num) {
+ Long l = (Long)m.getProperty(SEQNUM_KEY);
+ assertEquals("wrong sequence number", num, l.longValue());
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/ResequencerTxTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip;
+
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.namespace.QName;
+
+import org.apache.servicemix.eip.patterns.Resequencer;
+import org.apache.servicemix.eip.support.resequence.DefaultComparator;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.MessageList;
+import org.apache.servicemix.tck.ReceiverComponent;
+
+public class ResequencerTxTest extends AbstractEIPTransactionalTest {
+
+ private static final String RESEQUENCER_NAME = "resequencer";
+ private static final String TARGET_NAME = "target";
+ private static final String SEQNUM_KEY = "seqnum";
+
+ private Resequencer resequencer;
+
+ public void setUp() throws Exception {
+ super.setUp();
+ DefaultComparator comparator = new DefaultComparator();
+ comparator.setSequenceNumberAsString(false);
+ comparator.setSequenceNumberKey(SEQNUM_KEY);
+ resequencer = new Resequencer();
+ resequencer.setTarget(createServiceExchangeTarget(new QName(TARGET_NAME)));
+ resequencer.setComparator(comparator);
+ resequencer.setCapacity(100);
+ resequencer.setTimeout(1500L);
+ configurePattern(resequencer);
+ activateComponent(resequencer, RESEQUENCER_NAME);
+ }
+
+ public void testAsyncTx() throws Exception {
+ int numMessages = 5;
+ ReceiverComponent receiver = activateReceiver(TARGET_NAME);
+ tm.begin();
+ client.send(createTestMessageExchange(4));
+ client.send(createTestMessageExchange(1));
+ client.send(createTestMessageExchange(3));
+ client.send(createTestMessageExchange(5));
+ client.send(createTestMessageExchange(2));
+ tm.commit();
+ MessageList ml = receiver.getMessageList();
+ ml.waitForMessagesToArrive(numMessages);
+ assertEquals("wrong number of messages", numMessages, ml.getMessageCount());
+ for (int i = 0; i < numMessages; i++) {
+ assertSequenceProperties((NormalizedMessage)ml.getMessages().get(i), i + 1);
+ }
+ for (int i = 0; i < numMessages; i++) {
+ MessageExchange me = (InOnly)client.receive();
+ assertEquals(ExchangeStatus.DONE, me.getStatus());
+ }
+ }
+
+ private MessageExchange createTestMessageExchange(long num) throws Exception {
+ InOnly me = client.createInOnlyExchange();
+ me.setService(new QName(RESEQUENCER_NAME));
+ me.getInMessage().setProperty(SEQNUM_KEY, new Long(num));
+ me.getInMessage().setContent(new StringSource("<number>" + num + "</number>"));
+ return me;
+ }
+
+ private static void assertSequenceProperties(NormalizedMessage m, long num) throws Exception {
+ Long l = (Long)m.getProperty(SEQNUM_KEY);
+ if (l == null) {
+ // get sequence number from message content
+ long n = getNumber(new SourceTransformer().toString(m.getContent()));
+ assertEquals("wrong sequence number", num, n);
+ // TODO: investigate JDK 1.6.0_01 issues here
+ // When using JDK 1.6.0_01 then messages transported to receiver
+ // conponent don't have any properties set. This is only the case
+ // if this test is running with all other EIP tests. When running
+ // alone, messages contain properties as expected. When using
+ // JDK 1.5.0_12 messages always contain properties as expected.
+ } else {
+ // get sequence number from message properties
+ assertEquals("wrong sequence number", num, l.longValue());
+ }
+ }
+
+ private static long getNumber(String content) {
+ int idx1 = content.indexOf("<number>") + 8;
+ int idx2 = content.indexOf("</number>");
+ return Long.parseLong(content.substring(idx1, idx2));
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/IntegerComparator.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+public class IntegerComparator implements SequenceElementComparator<Integer> {
+
+ public boolean predecessor(Integer o1, Integer o2) {
+ return o1.intValue() == (o2.intValue() - 1);
+ }
+
+ public boolean successor(Integer o1, Integer o2) {
+ return o2.intValue() == (o1.intValue() - 1);
+ }
+
+ public int compare(Integer o1, Integer o2) {
+ return o1.compareTo(o2);
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/ResequencerEngineTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+public class ResequencerEngineTest extends TestCase {
+
+ private static final boolean IGNORE_LOAD_TESTS = true;
+
+ private ResequencerEngine<Integer> resequencer;
+
+ private LinkedBlockingQueue<Integer> queue;
+
+ public void setUp() throws Exception {
+ }
+
+ public void tearDown() throws Exception {
+ if (resequencer != null) {
+ resequencer.stop();
+ }
+ }
+
+ public void testTimeout1() throws InterruptedException {
+ initResequencer(500, 10);
+ resequencer.put(4);
+ assertNull(queue.poll(250, TimeUnit.MILLISECONDS));
+ assertEquals((Integer)4, queue.take());
+ assertEquals((Integer)4, resequencer.getLastDelivered());
+ }
+
+ public void testTimeout2() throws InterruptedException {
+ initResequencer(500, 10);
+ resequencer.setLastDelivered(2);
+ resequencer.put(4);
+ assertNull(queue.poll(250, TimeUnit.MILLISECONDS));
+ assertEquals((Integer)4, queue.take());
+ assertEquals((Integer)4, resequencer.getLastDelivered());
+ }
+
+ public void testTimeout3() throws InterruptedException {
+ initResequencer(500, 10);
+ resequencer.setLastDelivered(3);
+ resequencer.put(4);
+ assertEquals((Integer)4, queue.poll(250, TimeUnit.MILLISECONDS));
+ assertEquals((Integer)4, resequencer.getLastDelivered());
+ }
+
+ public void testTimout4() throws InterruptedException {
+ initResequencer(500, 10);
+ resequencer.setLastDelivered(2);
+ resequencer.put(4);
+ resequencer.put(3);
+ assertEquals((Integer)3, queue.poll(125, TimeUnit.MILLISECONDS));
+ assertEquals((Integer)4, queue.poll(125, TimeUnit.MILLISECONDS));
+ assertEquals((Integer)4, resequencer.getLastDelivered());
+ }
+
+ public void testRandom() throws InterruptedException {
+ if (IGNORE_LOAD_TESTS) {
+ return;
+ }
+ int input = 1000;
+ initResequencer(1000, 1000);
+ List<Integer> list = new LinkedList<Integer>();
+ for (int i = 0; i < input; i++) {
+ list.add(i);
+ }
+ Random random = new Random(System.currentTimeMillis());
+ System.out.println("Input sequence:");
+ long millis = System.currentTimeMillis();
+ for (int i = input; i > 0; i--) {
+ int r = random.nextInt(i);
+ int next = list.remove(r);
+ System.out.print(next + " ");
+ resequencer.put(next);
+ }
+ System.out.println("\nOutput sequence:");
+ for (int i = 0; i < input; i++) {
+ System.out.print(queue.take() + " ");
+ }
+ millis = System.currentTimeMillis() - millis;
+ System.out.println("\nDuration = " + millis + " ms");
+ }
+
+ public void testReverse1() throws InterruptedException {
+ if (IGNORE_LOAD_TESTS) {
+ return;
+ }
+ testReverse(10);
+ }
+
+ public void testReverse2() throws InterruptedException {
+ if (IGNORE_LOAD_TESTS) {
+ return;
+ }
+ testReverse(100);
+ }
+
+ private void testReverse(int capacity) throws InterruptedException {
+ initResequencer(1, capacity);
+ for (int i = 99; i >= 0; i--) {
+ resequencer.put(i);
+ }
+ System.out.println("\nOutput sequence:");
+ for (int i = 0; i < 100; i++) {
+ System.out.print(queue.take() + " ");
+ }
+ }
+
+ private void initResequencer(long timeout, int capacity) {
+ queue = new LinkedBlockingQueue<Integer>();
+ resequencer = new ResequencerEngine<Integer>(new IntegerComparator(), capacity);
+ resequencer.setOutQueue(queue);
+ resequencer.setTimeout(timeout);
+ }
+
+}
Added: incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java?view=auto&rev=561660
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java (added)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-eip/src/test/java/org/apache/servicemix/eip/support/resequence/SequenceTest.java Tue Jul 31 22:03:17 2007
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.eip.support.resequence;
+
+import junit.framework.TestCase;
+
+public class SequenceTest extends TestCase {
+
+ private TestObject e1;
+ private TestObject e2;
+ private TestObject e3;
+
+ private Sequence<TestObject> set;
+
+ public void setUp() throws Exception {
+ e1 = new TestObject(3);
+ e2 = new TestObject(4);
+ e3 = new TestObject(7);
+ set = new Sequence<TestObject>(new TestComparator());
+ set.add(e3);
+ set.add(e1);
+ set.add(e2);
+ }
+
+ public void tearDown() throws Exception {
+ }
+
+ public void testPredecessor() {
+ assertEquals(e1, set.predecessor(e2));
+ assertEquals(null, set.predecessor(e1));
+ assertEquals(null, set.predecessor(e3));
+ }
+
+ public void testSuccessor() {
+ assertEquals(e2, set.successor(e1));
+ assertEquals(null, set.successor(e2));
+ assertEquals(null, set.successor(e3));
+ }
+
+}