You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2015/07/03 16:38:23 UTC

svn commit: r1689023 - /james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java

Author: btellier
Date: Fri Jul  3 14:38:22 2015
New Revision: 1689023

URL: http://svn.apache.org/r1689023
Log:
MAILBOX-208 Reorder methods in Cassandra Message Mapper

Modified:
    james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java

Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java?rev=1689023&r1=1689022&r2=1689023&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java (original)
+++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java Fri Jul  3 14:38:22 2015
@@ -162,26 +162,6 @@ public class CassandraMessageMapper impl
         }
     }
 
-    private void decrementCount(Mailbox<CassandraId> mailbox) {
-        updateMailbox(mailbox, decr(CassandraMailboxCountersTable.COUNT));
-    }
-
-    private void incrementCount(Mailbox<CassandraId> mailbox) {
-        updateMailbox(mailbox, incr(CassandraMailboxCountersTable.COUNT));
-    }
-
-    private void decrementUnseen(Mailbox<CassandraId> mailbox) {
-        updateMailbox(mailbox, decr(CassandraMailboxCountersTable.UNSEEN));
-    }
-
-    private void incrementUnseen(Mailbox<CassandraId> mailbox) {
-        updateMailbox(mailbox, incr(CassandraMailboxCountersTable.UNSEEN));
-    }
-
-    private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment operation) {
-        session.execute(update(CassandraMailboxCountersTable.TABLE_NAME).with(operation).where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid())));
-    }
-
     @Override
     public Iterator<Message<CassandraId>> findInMailbox(Mailbox<CassandraId> mailbox, MessageRange set, FetchType ftype, int max) throws MailboxException {
         return convertToStream(session.execute(buildQuery(mailbox, set)))
@@ -189,94 +169,6 @@ public class CassandraMessageMapper impl
             .iterator();
     }
 
-    private byte[] getFullContent(Row row) {
-        byte[] headerContent = new byte[row.getBytes(HEADER_CONTENT).remaining()];
-        byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()];
-        row.getBytes(HEADER_CONTENT).get(headerContent);
-        row.getBytes(BODY_CONTENT).get(bodyContent);
-        return Bytes.concat(headerContent, bodyContent);
-    }
-
-    private Flags getFlags(Row row) {
-        return Arrays.stream(CassandraMessageTable.Flag.ALL)
-            .filter(row::getBool)
-            .map(JAVAX_MAIL_FLAG::get)
-            .reduce(new Flags(), (flags, flag) -> {
-                    flags.add(flag);
-                    return flags;
-                }, (flags1, flags2) -> {
-                    flags1.add(flags2);
-                    return flags1;
-                });
-    }
-
-    private PropertyBuilder getPropertyBuilder(Row row) {
-        PropertyBuilder property = new PropertyBuilder(
-            row.getList(PROPERTIES, UDTValue.class).stream()
-                .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
-                .collect(Collectors.toList()));
-        property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
-        return property;
-    }
-
-    private Message<CassandraId> message(Row row) {
-        SimpleMessage<CassandraId> message =
-            new SimpleMessage<>(
-                row.getDate(INTERNAL_DATE),
-                row.getInt(FULL_CONTENT_OCTETS),
-                row.getInt(BODY_START_OCTET),
-                new SharedByteArrayInputStream(getFullContent(row)),
-                getFlags(row),
-                getPropertyBuilder(row),
-                CassandraId.of(row.getUUID(MAILBOX_ID)));
-        message.setUid(row.getLong(IMAP_UID));
-        message.setModSeq(row.getLong(MOD_SEQ));
-        return message;
-    }
-
-    private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) {
-        final MessageRange.Type type = set.getType();
-        switch (type) {
-        case ALL:
-            return selectAll(mailbox);
-        case FROM:
-            return selectFrom(mailbox, set.getUidFrom());
-        case RANGE:
-            return selectRange(mailbox, set.getUidFrom(), set.getUidTo());
-        case ONE:
-            return selectMessage(mailbox, set.getUidFrom());
-        }
-        throw new UnsupportedOperationException();
-    }
-
-    private Where selectAll(Mailbox<CassandraId> mailbox) {
-        return select(FIELDS)
-            .from(TABLE_NAME)
-            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()));
-    }
-
-    private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) {
-        return select(FIELDS)
-            .from(TABLE_NAME)
-            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
-            .and(gte(IMAP_UID, uid));
-    }
-
-    private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to) {
-        return select(FIELDS)
-            .from(TABLE_NAME)
-            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
-            .and(gte(IMAP_UID, from))
-            .and(lte(IMAP_UID, to));
-    }
-
-    private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) {
-        return select(FIELDS)
-            .from(TABLE_NAME)
-            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
-            .and(eq(IMAP_UID, uid));
-    }
-
     @Override
     public List<Long> findRecentMessageUidsInMailbox(Mailbox<CassandraId> mailbox) throws MailboxException {
         return convertToStream(session.execute(selectAll(mailbox).and((eq(RECENT, true)))))
@@ -318,11 +210,6 @@ public class CassandraMessageMapper impl
     }
 
     @Override
-    public <T> T execute(Transaction<T> transaction) throws MailboxException {
-        return transaction.run();
-    }
-
-    @Override
     public MessageMetaData add(Mailbox<CassandraId> mailbox, Message<CassandraId> message) throws MailboxException {
         message.setUid(uidProvider.nextUid(mailboxSession, mailbox));
         message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
@@ -334,6 +221,107 @@ public class CassandraMessageMapper impl
         return messageMetaData;
     }
 
+    @Override
+    public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException {
+        return convertToStream(session.execute(buildQuery(mailbox, set)))
+            .map((row) -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, row))
+            .filter(Optional::isPresent)
+            .map(Optional::get)
+            .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, updatedFlags.getOldFlags(), updatedFlags.getNewFlags()))
+            .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result
+            .iterator();
+    }
+
+    @Override
+    public <T> T execute(Transaction<T> transaction) throws MailboxException {
+        return transaction.run();
+    }
+
+    @Override
+    public MessageMetaData copy(Mailbox<CassandraId> mailbox, Message<CassandraId> original) throws MailboxException {
+
+        original.setUid(uidProvider.nextUid(mailboxSession, mailbox));
+        original.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
+        incrementCount(mailbox);
+        if(!original.isSeen()) {
+            incrementUnseen(mailbox);
+        }
+        original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build());
+        return save(mailbox, original);
+    }
+
+    @Override
+    public long getLastUid(Mailbox<CassandraId> mailbox) throws MailboxException {
+        return uidProvider.lastUid(mailboxSession, mailbox);
+    }
+
+    private void decrementCount(Mailbox<CassandraId> mailbox) {
+        updateMailbox(mailbox, decr(CassandraMailboxCountersTable.COUNT));
+    }
+
+    private void incrementCount(Mailbox<CassandraId> mailbox) {
+        updateMailbox(mailbox, incr(CassandraMailboxCountersTable.COUNT));
+    }
+
+    private void decrementUnseen(Mailbox<CassandraId> mailbox) {
+        updateMailbox(mailbox, decr(CassandraMailboxCountersTable.UNSEEN));
+    }
+
+    private void incrementUnseen(Mailbox<CassandraId> mailbox) {
+        updateMailbox(mailbox, incr(CassandraMailboxCountersTable.UNSEEN));
+    }
+
+    private void updateMailbox(Mailbox<CassandraId> mailbox, Assignment operation) {
+        session.execute(update(CassandraMailboxCountersTable.TABLE_NAME)
+            .with(operation)
+            .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, mailbox.getMailboxId().asUuid())));
+    }
+
+    private Message<CassandraId> message(Row row) {
+        SimpleMessage<CassandraId> message =
+            new SimpleMessage<>(
+                row.getDate(INTERNAL_DATE),
+                row.getInt(FULL_CONTENT_OCTETS),
+                row.getInt(BODY_START_OCTET),
+                new SharedByteArrayInputStream(getFullContent(row)),
+                getFlags(row),
+                getPropertyBuilder(row),
+                CassandraId.of(row.getUUID(MAILBOX_ID)));
+        message.setUid(row.getLong(IMAP_UID));
+        message.setModSeq(row.getLong(MOD_SEQ));
+        return message;
+    }
+
+    private byte[] getFullContent(Row row) {
+        byte[] headerContent = new byte[row.getBytes(HEADER_CONTENT).remaining()];
+        byte[] bodyContent = new byte[row.getBytes(BODY_CONTENT).remaining()];
+        row.getBytes(HEADER_CONTENT).get(headerContent);
+        row.getBytes(BODY_CONTENT).get(bodyContent);
+        return Bytes.concat(headerContent, bodyContent);
+    }
+
+    private Flags getFlags(Row row) {
+        return Arrays.stream(CassandraMessageTable.Flag.ALL)
+            .filter(row::getBool)
+            .map(JAVAX_MAIL_FLAG::get)
+            .reduce(new Flags(), (flags, flag) -> {
+                flags.add(flag);
+                return flags;
+            }, (flags1, flags2) -> {
+                flags1.add(flags2);
+                return flags1;
+            });
+    }
+
+    private PropertyBuilder getPropertyBuilder(Row row) {
+        PropertyBuilder property = new PropertyBuilder(
+            row.getList(PROPERTIES, UDTValue.class).stream()
+                .map(x -> new SimpleProperty(x.getString(Properties.NAMESPACE), x.getString(Properties.NAME), x.getString(Properties.VALUE)))
+                .collect(Collectors.toList()));
+        property.setTextualLineCount(row.getLong(TEXTUAL_LINE_COUNT));
+        return property;
+    }
+
     private MessageMetaData save(Mailbox<CassandraId> mailbox, Message<CassandraId> message) throws MailboxException {
         try {
             Insert query = insertInto(TABLE_NAME)
@@ -372,38 +360,6 @@ public class CassandraMessageMapper impl
         }
     }
 
-    private boolean conditionalSave(Message<CassandraId> message, long oldModSeq) {
-        ResultSet resultSet = session.execute(
-            update(TABLE_NAME)
-                .with(set(ANSWERED, message.isAnswered()))
-                .and(set(DELETED, message.isDeleted()))
-                .and(set(DRAFT, message.isDraft()))
-                .and(set(FLAGGED, message.isFlagged()))
-                .and(set(RECENT, message.isRecent()))
-                .and(set(SEEN, message.isSeen()))
-                .and(set(USER, message.createFlags().contains(Flag.USER)))
-                .and(set(MOD_SEQ, message.getModSeq()))
-                .where(eq(IMAP_UID, message.getUid()))
-                .and(eq(MAILBOX_ID, message.getMailboxId().asUuid()))
-                .onlyIf(eq(MOD_SEQ, oldModSeq)));
-        return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
-    }
-
-    private ByteBuffer toByteBuffer(InputStream stream) throws IOException {
-        return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
-    }
-
-    @Override
-    public Iterator<UpdatedFlags> updateFlags(Mailbox<CassandraId> mailbox, FlagsUpdateCalculator flagsUpdateCalculator, MessageRange set) throws MailboxException {
-        return convertToStream(session.execute(buildQuery(mailbox, set)))
-            .map((row) -> updateFlagsOnMessage(mailbox, flagsUpdateCalculator, row))
-            .filter(Optional::isPresent)
-            .map(Optional::get)
-            .peek((updatedFlags) -> manageUnseenMessageCounts(mailbox, updatedFlags.getOldFlags(), updatedFlags.getNewFlags()))
-            .collect(Collectors.toList()) // This collect is here as we need to consume all the stream before returning result
-            .iterator();
-    }
-
     private void manageUnseenMessageCounts(Mailbox<CassandraId> mailbox, Flags oldFlags, Flags newFlags) {
         if (oldFlags.contains(Flag.SEEN) && !newFlags.contains(Flag.SEEN)) {
             incrementUnseen(mailbox);
@@ -455,27 +411,72 @@ public class CassandraMessageMapper impl
             .orElseThrow(() -> new MessageDeletedDuringFlagsUpdate(mailbox.getMailboxId(), uid));
     }
 
-    @Override
-    public MessageMetaData copy(Mailbox<CassandraId> mailbox, Message<CassandraId> original) throws MailboxException {
+    private boolean conditionalSave(Message<CassandraId> message, long oldModSeq) {
+        ResultSet resultSet = session.execute(
+            update(TABLE_NAME)
+                .with(set(ANSWERED, message.isAnswered()))
+                .and(set(DELETED, message.isDeleted()))
+                .and(set(DRAFT, message.isDraft()))
+                .and(set(FLAGGED, message.isFlagged()))
+                .and(set(RECENT, message.isRecent()))
+                .and(set(SEEN, message.isSeen()))
+                .and(set(USER, message.createFlags().contains(Flag.USER)))
+                .and(set(MOD_SEQ, message.getModSeq()))
+                .where(eq(IMAP_UID, message.getUid()))
+                .and(eq(MAILBOX_ID, message.getMailboxId().asUuid()))
+                .onlyIf(eq(MOD_SEQ, oldModSeq)));
+        return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
+    }
 
-        original.setUid(uidProvider.nextUid(mailboxSession, mailbox));
-        original.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
-        incrementCount(mailbox);
-        if(!original.isSeen()) {
-            incrementUnseen(mailbox);
+    private ByteBuffer toByteBuffer(InputStream stream) throws IOException {
+        return ByteBuffer.wrap(ByteStreams.toByteArray(stream));
+    }
+
+    private Where buildQuery(Mailbox<CassandraId> mailbox, MessageRange set) {
+        final MessageRange.Type type = set.getType();
+        switch (type) {
+        case ALL:
+            return selectAll(mailbox);
+        case FROM:
+            return selectFrom(mailbox, set.getUidFrom());
+        case RANGE:
+            return selectRange(mailbox, set.getUidFrom(), set.getUidTo());
+        case ONE:
+            return selectMessage(mailbox, set.getUidFrom());
         }
-        original.setFlags(new FlagsBuilder().add(original.createFlags()).add(Flag.RECENT).build());
-        return save(mailbox, original);
+        throw new UnsupportedOperationException();
     }
 
-    @Override
-    public long getLastUid(Mailbox<CassandraId> mailbox) throws MailboxException {
-        return uidProvider.lastUid(mailboxSession, mailbox);
+    private Where selectAll(Mailbox<CassandraId> mailbox) {
+        return select(FIELDS)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()));
+    }
+
+    private Where selectFrom(Mailbox<CassandraId> mailbox, long uid) {
+        return select(FIELDS)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
+            .and(gte(IMAP_UID, uid));
+    }
+
+    private Where selectRange(Mailbox<CassandraId> mailbox, long from, long to) {
+        return select(FIELDS)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
+            .and(gte(IMAP_UID, from))
+            .and(lte(IMAP_UID, to));
+    }
+
+    private Where selectMessage(Mailbox<CassandraId> mailbox, long uid) {
+        return select(FIELDS)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid()))
+            .and(eq(IMAP_UID, uid));
     }
 
     private Stream<Row> convertToStream(ResultSet resultSet) {
         return StreamSupport.stream(resultSet.spliterator(), true);
     }
 
-
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org