You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2017/08/17 11:17:02 UTC

[07/10] james-project git commit: JAMES-2122 Adding a log on swallowed blobs

JAMES-2122 Adding a log on swallowed blobs


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8c2eddf9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8c2eddf9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8c2eddf9

Branch: refs/heads/master
Commit: 8c2eddf9402de6002d210a37b2333a3a04f79768
Parents: 9507ba7
Author: benwa <bt...@linagora.com>
Authored: Thu Aug 17 10:19:20 2017 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu Aug 17 13:13:33 2017 +0200

----------------------------------------------------------------------
 .../cassandra/mail/CassandraBlobsDAO.java       | 47 +++++++++++++++-----
 1 file changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/8c2eddf9/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
index f6e3d21..819d427 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
@@ -41,16 +41,20 @@ import org.apache.james.mailbox.cassandra.table.BlobTable;
 import org.apache.james.mailbox.cassandra.table.BlobTable.BlobParts;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
 public class CassandraBlobsDAO {
+    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobsDAO.class);
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insert;
     private final PreparedStatement insertPart;
@@ -113,7 +117,7 @@ public class CassandraBlobsDAO {
     }
 
     private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) {
-        return FluentFutureStream.<Pair<Integer, Void>> of(
+        return FluentFutureStream.of(
             dataChunker.chunk(data, configuration.getBlobPartSize())
                 .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
                     .thenApply(partId -> Pair.of(pair.getKey(), partId))))
@@ -146,23 +150,29 @@ public class CassandraBlobsDAO {
         return cassandraAsyncExecutor.executeSingleRow(
             select.bind()
                 .setString(BlobTable.ID, blobId.getId()))
-            .thenCompose(this::toDataParts)
+            .thenCompose(row -> toDataParts(row, blobId))
             .thenApply(this::concatenateDataParts);
     }
 
-    private CompletableFuture<Stream<Optional<Row>>> toDataParts(Optional<Row> blobRowOptional) {
+    private CompletableFuture<Stream<BlobPart>> toDataParts(Optional<Row> blobRowOptional, BlobId blobId) {
         return blobRowOptional.map(blobRow -> {
-            BlobId blobId = BlobId.from(blobRow.getString(BlobTable.ID));
             int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK);
             return FluentFutureStream.of(
                 IntStream.range(0, numOfChunk)
                     .mapToObj(position -> readPart(blobId, position)))
                 .completableFuture();
-        }).orElse(CompletableFuture.completedFuture(Stream.empty()));
-    }
-
-    private byte[] concatenateDataParts(Stream<Optional<Row>> rows) {
-        ImmutableList<byte[]> parts = rows.flatMap(OptionalConverter::toStream)
+        }).orElseGet(() -> {
+            LOGGER.warn("Could not retrieve blob metadata for {}", blobId);
+            return CompletableFuture.completedFuture(Stream.empty());
+        });
+    }
+
+    private byte[] concatenateDataParts(Stream<BlobPart> blobParts) {
+        ImmutableList<byte[]> parts = blobParts
+            .map(blobPart -> OptionalConverter.ifEmpty(
+                blobPart.row,
+                () -> LOGGER.warn("Missing blob part for blobId {} and position {}", blobPart.blobId, blobPart.position)))
+            .flatMap(OptionalConverter::toStream)
             .map(this::rowToData)
             .collect(Guavate.toImmutableList());
 
@@ -175,10 +185,25 @@ public class CassandraBlobsDAO {
         return data;
     }
 
-    private CompletableFuture<Optional<Row>> readPart(BlobId blobId, int position) {
+    private CompletableFuture<BlobPart> readPart(BlobId blobId, int position) {
         return cassandraAsyncExecutor.executeSingleRow(
             selectPart.bind()
                 .setString(BlobTable.ID, blobId.getId())
-                .setInt(BlobParts.CHUNK_NUMBER, position));
+                .setInt(BlobParts.CHUNK_NUMBER, position))
+            .thenApply(row -> new BlobPart(blobId, position, row));
+    }
+
+    private static class BlobPart {
+        private final BlobId blobId;
+        private final int position;
+        private final Optional<Row> row;
+
+        public BlobPart(BlobId blobId, int position, Optional<Row> row) {
+            Preconditions.checkNotNull(blobId);
+            Preconditions.checkArgument(position >= 0, "position need to be positive");
+            this.blobId = blobId;
+            this.position = position;
+            this.row = row;
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org