You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2015/07/03 16:38:23 UTC
svn commit: r1689023 -
/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
Author: btellier
Date: Fri Jul 3 14:38:22 2015
New Revision: 1689023
URL: http://svn.apache.org/r1689023
Log:
MAILBOX-208 Reorder methods in Cassandra Message Mapper
Modified:
james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1689023&r1=1689022&r2=1689023&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java (original)
+++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java Fri Jul 3 14:38:22 2015
@@ -162,26 +162,6 @@ public class CassandraMessageMapper impl
}
}
- private void decrementCount(Mailbox<CassandraId> mailbox) {
- updateMailbox(mailbox, decr(CassandraMailboxCountersTable.COUNT));
- }
-
- private void incrementCount(Mailbox<CassandraId> mailbox) {
- updateMailbox(mailbox, incr(CassandraMailboxCountersTable.COUNT));
- }
-
- private void decrementUnseen(Mailbox<CassandraId> mailbox) {
- updateMailbox(mailbox, decr(CassandraMailboxCountersTable.UNSEEN));
- }
-
- private void incrementUnseen(Mailbox<CassandraId> mailbox) {
- updateMailbox(mailbox, incr(CassandraMailboxCountersTable.UNSEEN));
- }
-
- private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment operation) {
- session.execute(update(CassandraMailboxCountersTable.TABLE_NAME).with(operation).where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid())));
- }
-
@Override
public Iterator<Message<CassandraId>> findInMailbox(Mailbox<CassandraId> mailbox, MessageRange set, FetchType ftype, int max) throws MailboxException {
return convertToStream(session.execute(buildQuery(mailbox, set)))
@@ -189,94 +169,6 @@ public class CassandraMessageMapper impl
.iterator();
}
- private byte[] getFullContent(Row row) {
- byte[] headerContent = new byte[row.getBytes(HEADER_CONTENT).remaining()];
- byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()];
- row.getBytes(HEADER_CONTENT).get(headerContent);
- row.getBytes(BODY_CONTENT).get(bodyContent);
- return Bytes.concat(headerContent, bodyContent);
- }
-
- private Flags getFlags(Row row) {
- return Arrays.stream(CassandraMessageTable.Flag.ALL)
- .filter(row::getBool)
- .map(JAVAX_MAIL_FLAG::get)
- .reduce(new Flags(), (flags, flag) -> {
- flags.add(flag);
- return flags;
- }, (flags1, flags2) -> {
- flags1.add(flags2);
- return flags1;
- });
- }
-
- private PropertyBuilder getPropertyBuilder(Row row) {
- PropertyBuilder property = new PropertyBuilder(
- row.getList(PROPERTIES, UDTValue.class).stream()
- .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
- .collect(Collectors.toList()));
- property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
- return property;
- }
-
- private Message<CassandraId> message(Row row) {
- SimpleMessage<CassandraId> message =
- new SimpleMessage<>(
- row.getDate(INTERNAL_DATE),
- row.getInt(FULL_CONTENT_OCTETS),
- row.getInt(BODY_START_OCTET),
- new SharedByteArrayInputStream(getFullContent(row)),
- getFlags(row),
- getPropertyBuilder(row),
- CassandraId.of(row.getUUID(MAILBOX_ID)));
- message.setUid(row.getLong(IMAP_UID));
- message.setModSeq(row.getLong(MOD_SEQ));
- return message;
- }
-
- private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) {
- final MessageRange.Type type = set.getType();
- switch (type) {
- case ALL:
- return selectAll(mailbox);
- case FROM:
- return selectFrom(mailbox, set.getUidFrom());
- case RANGE:
- return selectRange(mailbox, set.getUidFrom(), set.getUidTo());
- case ONE:
- return selectMessage(mailbox, set.getUidFrom());
- }
- throw new UnsupportedOperationException();
- }
-
- private Where selectAll(Mailbox<CassandraId> mailbox) {
- return select(FIELDS)
- .from(TABLE_NAME)
- .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()));
- }
-
- private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) {
- return select(FIELDS)
- .from(TABLE_NAME)
- .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
- .and(gte(IMAP_UID, uid));
- }
-
- private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to) {
- return select(FIELDS)
- .from(TABLE_NAME)
- .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
- .and(gte(IMAP_UID, from))
- .and(lte(IMAP_UID, to));
- }
-
- private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) {
- return select(FIELDS)
- .from(TABLE_NAME)
- .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
- .and(eq(IMAP_UID, uid));
- }
-
@Override
public List<Long> findRecentMessageUidsInMailbox(Mailbox<CassandraId> mailbox) throws MailboxException {
return convertToStream(session.execute(selectAll(mailbox).and((eq(RECENT, true)))))
@@ -318,11 +210,6 @@ public class CassandraMessageMapper impl
}
@Override
- public <T> T execute(Transaction<T> transaction) throws MailboxException {
- return transaction.run();
- }
-
- @Override
public MessageMetaData add(Mailbox<CassandraId> mailbox, Message<CassandraId> message) throws MailboxException {
message.setUid(uidProvider.nextUid(mailboxSession, mailbox));
message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
@@ -334,6 +221,107 @@ public class CassandraMessageMapper impl
return messageMetaData;
}
+ @Override
+ public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException {
+ return convertToStream(session.execute(buildQuery(mailbox, set)))
+ .map((row) -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, row))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, updatedFlags.getOldFlags(), updatedFlags.getNewFlags()))
+ .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result
+ .iterator();
+ }
+
+ @Override
+ public <T> T execute(Transaction<T> transaction) throws MailboxException {
+ return transaction.run();
+ }
+
+ @Override
+ public MessageMetaData copy(Mailbox<CassandraId> mailbox, Message<CassandraId> original) throws MailboxException {
+
+ original.setUid(uidProvider.nextUid(mailboxSession, mailbox));
+ original.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
+ incrementCount(mailbox);
+ if(!original.isSeen()) {
+ incrementUnseen(mailbox);
+ }
+ original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build());
+ return save(mailbox, original);
+ }
+
+ @Override
+ public long getLastUid(Mailbox<CassandraId> mailbox) throws MailboxException {
+ return uidProvider.lastUid(mailboxSession, mailbox);
+ }
+
+ private void decrementCount(Mailbox<CassandraId> mailbox) {
+ updateMailbox(mailbox, decr(CassandraMailboxCountersTable.COUNT));
+ }
+
+ private void incrementCount(Mailbox<CassandraId> mailbox) {
+ updateMailbox(mailbox, incr(CassandraMailboxCountersTable.COUNT));
+ }
+
+ private void decrementUnseen(Mailbox<CassandraId> mailbox) {
+ updateMailbox(mailbox, decr(CassandraMailboxCountersTable.UNSEEN));
+ }
+
+ private void incrementUnseen(Mailbox<CassandraId> mailbox) {
+ updateMailbox(mailbox, incr(CassandraMailboxCountersTable.UNSEEN));
+ }
+
+ private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment operation) {
+ session.execute(update(CassandraMailboxCountersTable.TABLE_NAME)
+ .with(operation)
+ .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid())));
+ }
+
+ private Message<CassandraId> message(Row row) {
+ SimpleMessage<CassandraId> message =
+ new SimpleMessage<>(
+ row.getDate(INTERNAL_DATE),
+ row.getInt(FULL_CONTENT_OCTETS),
+ row.getInt(BODY_START_OCTET),
+ new SharedByteArrayInputStream(getFullContent(row)),
+ getFlags(row),
+ getPropertyBuilder(row),
+ CassandraId.of(row.getUUID(MAILBOX_ID)));
+ message.setUid(row.getLong(IMAP_UID));
+ message.setModSeq(row.getLong(MOD_SEQ));
+ return message;
+ }
+
+ private byte[] getFullContent(Row row) {
+ byte[] headerContent = new byte[row.getBytes(HEADER_CONTENT).remaining()];
+ byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()];
+ row.getBytes(HEADER_CONTENT).get(headerContent);
+ row.getBytes(BODY_CONTENT).get(bodyContent);
+ return Bytes.concat(headerContent, bodyContent);
+ }
+
+ private Flags getFlags(Row row) {
+ return Arrays.stream(CassandraMessageTable.Flag.ALL)
+ .filter(row::getBool)
+ .map(JAVAX_MAIL_FLAG::get)
+ .reduce(new Flags(), (flags, flag) -> {
+ flags.add(flag);
+ return flags;
+ }, (flags1, flags2) -> {
+ flags1.add(flags2);
+ return flags1;
+ });
+ }
+
+ private PropertyBuilder getPropertyBuilder(Row row) {
+ PropertyBuilder property = new PropertyBuilder(
+ row.getList(PROPERTIES, UDTValue.class).stream()
+ .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
+ .collect(Collectors.toList()));
+ property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
+ return property;
+ }
+
private MessageMetaData save(Mailbox<CassandraId> mailbox, Message<CassandraId> message) throws MailboxException {
try {
Insert query = insertInto(TABLE_NAME)
@@ -372,38 +360,6 @@ public class CassandraMessageMapper impl
}
}
- private boolean conditionalSave(Message<CassandraId> message, long oldModSeq) {
- ResultSet resultSet = session.execute(
- update(TABLE_NAME)
- .with(set(ANSWERED, message.isAnswered()))
- .and(set(DELETED, message.isDeleted()))
- .and(set(DRAFT, message.isDraft()))
- .and(set(FLAGGED, message.isFlagged()))
- .and(set(RECENT, message.isRecent()))
- .and(set(SEEN, message.isSeen()))
- .and(set(USER, message.createFlags().contains(Flag.USER)))
- .and(set(MOD_SEQ, message.getModSeq()))
- .where(eq(IMAP_UID, message.getUid()))
- .and(eq(MAILBOX_ID, message.getMailboxId().asUuid()))
- .onlyIf(eq(MOD_SEQ, oldModSeq)));
- return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
- }
-
- private ByteBuffer toByteBuffer(InputStream stream) throws IOException {
- return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
- }
-
- @Override
- public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange set) throws MailboxException {
- return convertToStream(session.execute(buildQuery(mailbox, set)))
- .map((row) -> updateFlagsOnMessage(mailbox, flagsUpdateCalculator, row))
- .filter(Optional::isPresent)
- .map(Optional::get)
- .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, updatedFlags.getOldFlags(), updatedFlags.getNewFlags()))
- .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result
- .iterator();
- }
-
private void manageUnseenMessageCounts(Mailbox<CassandraId> mailbox, Flags oldFlags, Flags newFlags) {
if (oldFlags.contains(Flag.SEEN) && !newFlags.contains(Flag.SEEN)) {
incrementUnseen(mailbox);
@@ -455,27 +411,72 @@ public class CassandraMessageMapper impl
.orElseThrow(() -> new MessageDeletedDuringFlagsUpdate(mailbox.getMailboxId(), uid));
}
- @Override
- public MessageMetaData copy(Mailbox<CassandraId> mailbox, Message<CassandraId> original) throws MailboxException {
+ private boolean conditionalSave(Message<CassandraId> message, long oldModSeq) {
+ ResultSet resultSet = session.execute(
+ update(TABLE_NAME)
+ .with(set(ANSWERED, message.isAnswered()))
+ .and(set(DELETED, message.isDeleted()))
+ .and(set(DRAFT, message.isDraft()))
+ .and(set(FLAGGED, message.isFlagged()))
+ .and(set(RECENT, message.isRecent()))
+ .and(set(SEEN, message.isSeen()))
+ .and(set(USER, message.createFlags().contains(Flag.USER)))
+ .and(set(MOD_SEQ, message.getModSeq()))
+ .where(eq(IMAP_UID, message.getUid()))
+ .and(eq(MAILBOX_ID, message.getMailboxId().asUuid()))
+ .onlyIf(eq(MOD_SEQ, oldModSeq)));
+ return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
+ }
- original.setUid(uidProvider.nextUid(mailboxSession, mailbox));
- original.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
- incrementCount(mailbox);
- if(!original.isSeen()) {
- incrementUnseen(mailbox);
+ private ByteBuffer toByteBuffer(InputStream stream) throws IOException {
+ return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
+ }
+
+ private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) {
+ final MessageRange.Type type = set.getType();
+ switch (type) {
+ case ALL:
+ return selectAll(mailbox);
+ case FROM:
+ return selectFrom(mailbox, set.getUidFrom());
+ case RANGE:
+ return selectRange(mailbox, set.getUidFrom(), set.getUidTo());
+ case ONE:
+ return selectMessage(mailbox, set.getUidFrom());
}
- original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build());
- return save(mailbox, original);
+ throw new UnsupportedOperationException();
}
- @Override
- public long getLastUid(Mailbox<CassandraId> mailbox) throws MailboxException {
- return uidProvider.lastUid(mailboxSession, mailbox);
+ private Where selectAll(Mailbox<CassandraId> mailbox) {
+ return select(FIELDS)
+ .from(TABLE_NAME)
+ .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()));
+ }
+
+ private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) {
+ return select(FIELDS)
+ .from(TABLE_NAME)
+ .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
+ .and(gte(IMAP_UID, uid));
+ }
+
+ private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to) {
+ return select(FIELDS)
+ .from(TABLE_NAME)
+ .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
+ .and(gte(IMAP_UID, from))
+ .and(lte(IMAP_UID, to));
+ }
+
+ private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) {
+ return select(FIELDS)
+ .from(TABLE_NAME)
+ .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
+ .and(eq(IMAP_UID, uid));
}
private Stream<Row> convertToStream(ResultSet resultSet) {
return StreamSupport.stream(resultSet.spliterator(), true);
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org