You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2018/07/17 02:36:35 UTC
[3/4] james-project git commit: JAMES-2167 Serializable attributes
are not preserved by enqueue/dequeue on a JMS queue.
JAMES-2167 Serializable attributes are not preserved by enqueue/dequeue on a JMS queue.
The non String email attributes now serialized/deserialized properly.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/46ea987a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/46ea987a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/46ea987a
Branch: refs/heads/master
Commit: 46ea987a02583ae19ccac6c9f1c7952e343a0d74
Parents: b4065d5
Author: Edgar Asatryan <ns...@gmail.com>
Authored: Sat Jul 7 00:03:44 2018 +0400
Committer: benwa <bt...@linagora.com>
Committed: Tue Jul 17 09:35:56 2018 +0700
----------------------------------------------------------------------
.../james/queue/api/MailQueueContract.java | 46 ++++++++
.../apache/james/queue/jms/JMSMailQueue.java | 79 ++++++--------
.../james/queue/jms/JMSSerializationUtils.java | 66 ++++++++++++
.../james/queue/jms/JMSMailQueueTest.java | 27 +----
.../queue/jms/JMSSerializationUtilsTest.java | 107 +++++++++++++++++++
5 files changed, 252 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
index 975d8ed..205c58b 100644
--- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
+++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/MailQueueContract.java
@@ -28,7 +28,9 @@ import static org.apache.mailet.base.MailAddressFixture.SENDER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.Serializable;
import java.util.Date;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -44,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import com.github.fge.lambdas.Throwing;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
@ExtendWith(ExecutorExtension.class)
@@ -203,6 +206,20 @@ public interface MailQueueContract {
}
@Test
+ default void queueShouldPreserveNonStringMailAttribute() throws Exception {
+ String attributeName = "any";
+ SerializableAttribute attributeValue = new SerializableAttribute("value");
+ getMailQueue().enQueue(defaultMail()
+ .attribute(attributeName, attributeValue)
+ .build());
+
+ MailQueue.MailQueueItem mailQueueItem = getMailQueue().deQueue();
+ assertThat(mailQueueItem.getMail().getAttribute(attributeName))
+ .isInstanceOf(SerializableAttribute.class)
+ .isEqualTo(attributeValue);
+ }
+
+ @Test
default void dequeueShouldBeFifo() throws Exception {
String firstExpectedName = "name1";
getMailQueue().enQueue(defaultMail()
@@ -305,4 +322,33 @@ public interface MailQueueContract {
assertThat(tryDequeue.get().getMail().getName()).isEqualTo("name");
}
+ class SerializableAttribute implements Serializable {
+ private final String value;
+
+ public SerializableAttribute(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public final boolean equals(Object o) {
+ if (o instanceof SerializableAttribute) {
+ SerializableAttribute that = (SerializableAttribute) o;
+
+ return Objects.equals(this.value, that.value);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hash(value);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("value", value)
+ .toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
index 08e9e11..f31a23b 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
@@ -20,7 +20,6 @@ package org.apache.james.queue.jms;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@@ -28,7 +27,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -51,9 +49,7 @@ import javax.mail.MessagingException;
import javax.mail.internet.AddressException;
import javax.mail.internet.MimeMessage;
-import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.iterators.EnumerationIterator;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.james.core.MailAddress;
import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.metrics.api.Metric;
@@ -73,6 +69,7 @@ import org.threeten.extra.Temporals;
import com.github.fge.lambdas.Throwing;
import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
import com.google.common.collect.Iterators;
/**
@@ -153,6 +150,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
protected final Queue queue;
protected final MessageProducer producer;
+ private final Joiner joiner;
+ private final Splitter splitter;
+
public JMSMailQueue(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, String queueName, MetricFactory metricFactory) {
try {
connection = connectionFactory.createConnection();
@@ -166,6 +166,10 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
this.enqueuedMailsMetric = metricFactory.generate("enqueuedMail:" + queueName);
this.mailQueueSize = metricFactory.generate("mailQueueSize:" + queueName);
+ this.joiner = Joiner.on(JAMES_MAIL_SEPARATOR).skipNulls();
+ this.splitter = Splitter.on(JAMES_MAIL_SEPARATOR)
+ .omitEmptyStrings() // ignore null values. See JAMES-1294
+ .trimResults();
try {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(queueName);
@@ -313,11 +317,10 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
// won't serialize the empty headers so it is mandatory
// to handle nulls when reconstructing mail from message
if (!mail.getPerRecipientSpecificHeaders().getHeadersByRecipient().isEmpty()) {
- byte[] serialize = SerializationUtils.serialize(mail.getPerRecipientSpecificHeaders());
- props.put(JAMES_MAIL_PER_RECIPIENT_HEADERS, Base64.encodeBase64String(serialize));
+ props.put(JAMES_MAIL_PER_RECIPIENT_HEADERS, JMSSerializationUtils.serialize(mail.getPerRecipientSpecificHeaders()));
}
- String recipientsAsString = Joiner.on(JAMES_MAIL_SEPARATOR).skipNulls().join(mail.getRecipients());
+ String recipientsAsString = joiner.join(mail.getRecipients());
props.put(JAMES_MAIL_RECIPIENTS, recipientsAsString);
props.put(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
@@ -325,22 +328,13 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
String sender = Optional.ofNullable(mail.getSender()).map(MailAddress::asString).orElse("");
- StringBuilder attrsBuilder = new StringBuilder();
- Iterator<String> attrs = mail.getAttributeNames();
- while (attrs.hasNext()) {
- String attrName = attrs.next();
- attrsBuilder.append(attrName);
-
- Object value = convertAttributeValue(mail.getAttribute(attrName));
- props.put(attrName, value);
+ org.apache.james.util.streams.Iterators.toStream(mail.getAttributeNames())
+ .forEach(attrName -> props.put(attrName, JMSSerializationUtils.serialize(mail.getAttribute(attrName))));
- if (attrs.hasNext()) {
- attrsBuilder.append(JAMES_MAIL_SEPARATOR);
- }
- }
- props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
+ props.put(JAMES_MAIL_ATTRIBUTE_NAMES, joiner.join(mail.getAttributeNames()));
props.put(JAMES_MAIL_SENDER, sender);
props.put(JAMES_MAIL_STATE, mail.getState());
+
return props;
}
@@ -392,10 +386,7 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED)));
mail.setName(message.getStringProperty(JAMES_MAIL_NAME));
- Optional.ofNullable(message.getStringProperty(JAMES_MAIL_PER_RECIPIENT_HEADERS))
- .map(String::getBytes)
- .map(Throwing.function(Base64::decodeBase64))
- .<PerRecipientHeaders>map(SerializationUtils::deserialize)
+ Optional.ofNullable(JMSSerializationUtils.<PerRecipientHeaders>deserialize(message.getStringProperty(JAMES_MAIL_PER_RECIPIENT_HEADERS)))
.ifPresent(mail::addAllSpecificHeaderForRecipient);
List<MailAddress> rcpts = new ArrayList<>();
@@ -417,23 +408,9 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
mail.setRemoteHost(message.getStringProperty(JAMES_MAIL_REMOTEHOST));
String attributeNames = message.getStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES);
- StringTokenizer namesTokenizer = new StringTokenizer(attributeNames, JAMES_MAIL_SEPARATOR);
- while (namesTokenizer.hasMoreTokens()) {
- String name = namesTokenizer.nextToken();
-
- // Now cast the property back to Serializable and set it as attribute.
- // See JAMES-1241
- Object attrValue = message.getObjectProperty(name);
-
- // ignore null values. See JAMES-1294
- if (attrValue != null) {
- if (attrValue instanceof Serializable) {
- mail.setAttribute(name, (Serializable) attrValue);
- } else {
- LOGGER.error("Not supported mail attribute {} of type {} for mail {}", name, attrValue, mail.getName());
- }
- }
- }
+
+ splitter.split(attributeNames)
+ .forEach(name -> setMailAttribute(message, mail, name));
String sender = message.getStringProperty(JAMES_MAIL_SENDER);
if (sender == null || sender.trim().length() <= 0) {
@@ -454,16 +431,22 @@ public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPriori
}
/**
- * Convert the attribute value if necessary.
+ * Retrieves the attribute by {@code name} form {@code message} and tries to add it on {@code mail}.
*
- * @param value
- * @return convertedValue
+ * @param message The attribute source.
+ * @param mail The mail on which attribute should be set.
+ * @param name The attribute name.
*/
- protected Object convertAttributeValue(Object value) {
- if (value == null || value instanceof String || value instanceof Byte || value instanceof Long || value instanceof Double || value instanceof Boolean || value instanceof Integer || value instanceof Short || value instanceof Float) {
- return value;
+ private void setMailAttribute(Message message, Mail mail, String name) {
+ // Now cast the property back to Serializable and set it as attribute.
+ // See JAMES-1241
+ Object attrValue = Throwing.function(message::getObjectProperty).apply(name);
+
+ if (attrValue instanceof String) {
+ mail.setAttribute(name, JMSSerializationUtils.deserialize((String) attrValue));
+ } else {
+ LOGGER.error("Not supported mail attribute {} of type {} for mail {}", name, attrValue, mail.getName());
}
- return value.toString();
}
@Override
http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java
new file mode 100644
index 0000000..98a0ac7
--- /dev/null
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSSerializationUtils.java
@@ -0,0 +1,66 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.SerializationUtils;
+
+import com.github.fge.lambdas.Throwing;
+
+/**
+ * This class is similar to {@link SerializationUtils}. Unlike {@link SerializationUtils} this class operates with
+ * {@code String}s and not byte arrays.
+ * <p>
+ * The main advantage of this utility is that it introduces an additional operation after serialization and before
+ * deserialization. The operation consists in encoding/decoding the serialized/deserialized data in Base64, so that data
+ * can be safely transmitted over the wire.
+ */
+public class JMSSerializationUtils {
+ /**
+ * Serialize the input object using standard mechanisms then encodes result using base64 encoding.
+ *
+ * @param obj The object that needs to be serialized.
+ *
+ * @return The base64 representation of {@code obj}.
+ */
+ public static String serialize(Serializable obj) {
+ return Optional.ofNullable(obj)
+ .map(SerializationUtils::serialize)
+ .map(Base64::encodeBase64String)
+ .orElse(null);
+ }
+
+ /**
+ * Decodes the input base64 string and deserialize it.
+ *
+ * @param <T> The resulting type after deserialization.
+ * @param object The base64 encoded string.
+ *
+ * @return The deserialized object.
+ */
+ public static <T extends Serializable> T deserialize(String object) {
+ return Optional.ofNullable(object)
+ .map(Throwing.function(Base64::decodeBase64))
+ .<T>map(SerializationUtils::deserialize)
+ .orElse(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
index aa6a6e4..826f42b 100644
--- a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSMailQueueTest.java
@@ -19,8 +19,6 @@
package org.apache.james.queue.jms;
-import java.util.concurrent.ExecutorService;
-
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -44,7 +42,7 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
private JMSMailQueue mailQueue;
@BeforeEach
- public void setUp(BrokerService broker) throws Exception {
+ void setUp(BrokerService broker) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
NoopMetricFactory metricFactory = new NoopMetricFactory();
@@ -53,7 +51,7 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
}
@AfterEach
- public void tearDown() throws Exception {
+ void tearDown() {
mailQueue.dispose();
}
@@ -83,20 +81,6 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
@Test
@Override
- @Disabled("JAMES-2301 Per recipients headers are not attached to the message.")
- public void queueShouldPreservePerRecipientHeaders() {
-
- }
-
- @Test
- @Override
- @Disabled("JAMES-2296 Not handled by JMS mailqueue. Only single recipient per-recipient removal works")
- public void removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients() {
-
- }
-
- @Test
- @Override
@Disabled("JAMES-2308 Flushing JMS mail queue randomly re-order them" +
"Random test failing around 1% of the time")
public void flushShouldPreserveBrowseOrder() {
@@ -105,13 +89,6 @@ public class JMSMailQueueTest implements DelayedManageableMailQueueContract, Pri
@Test
@Override
- @Disabled("JAMES-2309 Long overflow in JMS delays")
- public void enqueueWithVeryLongDelayShouldDelayMail(ExecutorService executorService) {
-
- }
-
- @Test
- @Override
@Disabled("JAMES-2312 JMS clear mailqueue can ommit some messages" +
"Random test failing around 1% of the time")
public void clearShouldRemoveAllElements() {
http://git-wip-us.apache.org/repos/asf/james-project/blob/46ea987a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java
----------------------------------------------------------------------
diff --git a/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java
new file mode 100644
index 0000000..6ab010f
--- /dev/null
+++ b/server/queue/queue-jms/src/test/java/org/apache/james/queue/jms/JMSSerializationUtilsTest.java
@@ -0,0 +1,107 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+import static org.apache.james.queue.jms.JMSSerializationUtils.deserialize;
+import static org.apache.james.queue.jms.JMSSerializationUtils.serialize;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.SerializationException;
+import org.junit.jupiter.api.Test;
+
+class JMSSerializationUtilsTest {
+ /**
+ * Serializes and deserializes the provided object.
+ *
+ * @param obj The object that needs to be serialized.
+ * @param <T> The type of the provided object.
+ *
+ * @return The provided object.
+ */
+ private static <T extends Serializable> T roundtrip(T obj) {
+ return Optional.ofNullable(obj)
+ .map(JMSSerializationUtils::serialize)
+ .<T>map(JMSSerializationUtils::deserialize)
+ .orElseThrow(() -> new IllegalArgumentException("Cannot serialize/deserialize: " + obj));
+ }
+
+ @Test
+ void trySerializeShouldReturnString() {
+ SerializableStringHolder value = new SerializableStringHolder("value");
+
+ String serializedIntegerString = "rO0ABXNyAE1vcmcuYXBhY2hlLmphbWVzLnF1ZXVlLmptcy5KTVNTZXJpYWxp" +
+ "emF0aW9uVXRpbHNUZXN0JFNlcmlhbGl6YWJsZVN0cmluZ0hvbGRlcsy4/DEA" +
+ "8nRZAgABTAAFdmFsdWV0ABJMamF2YS9sYW5nL1N0cmluZzt4cHQABXZhbHVl";
+
+ String actual = serialize(value);
+
+ assertThat(actual).isEqualTo(serializedIntegerString);
+ }
+
+ @Test
+ void roundTripShouldReturnEqualObject() {
+ SerializableStringHolder expected = new SerializableStringHolder("value");
+
+ assertThat(roundtrip(expected)).isEqualTo(expected);
+ }
+
+ @Test
+ void deserializeShouldThrowWhenNotBase64StringProvided() {
+ assertThatExceptionOfType(SerializationException.class)
+ .isThrownBy(() -> deserialize("abc"));
+ }
+
+ @Test
+ void deserializeShouldThrowWhenNotSerializedBytesAreEncodedInBase64() {
+ assertThatExceptionOfType(SerializationException.class)
+ .isThrownBy(() -> deserialize(Base64.encodeBase64String("abc".getBytes(StandardCharsets.UTF_8))));
+ }
+
+ private static class SerializableStringHolder implements Serializable {
+ private final String value;
+
+ SerializableStringHolder(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SerializableStringHolder)) {
+ return false;
+ }
+ SerializableStringHolder that = (SerializableStringHolder) o;
+ return Objects.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org