You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2017/08/17 11:17:02 UTC
[07/10] james-project git commit: JAMES-2122 Adding a log on
swallowed blobs
JAMES-2122 Adding a log on swallowed blobs
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/8c2eddf9
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/8c2eddf9
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/8c2eddf9
Branch: refs/heads/master
Commit: 8c2eddf9402de6002d210a37b2333a3a04f79768
Parents: 9507ba7
Author: benwa <bt...@linagora.com>
Authored: Thu Aug 17 10:19:20 2017 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Thu Aug 17 13:13:33 2017 +0200
----------------------------------------------------------------------
.../cassandra/mail/CassandraBlobsDAO.java | 47 +++++++++++++++-----
1 file changed, 36 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/8c2eddf9/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
index f6e3d21..819d427 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
@@ -41,16 +41,20 @@ import org.apache.james.mailbox.cassandra.table.BlobTable;
import org.apache.james.mailbox.cassandra.table.BlobTable.BlobParts;
import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.OptionalConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes;
public class CassandraBlobsDAO {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobsDAO.class);
private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final PreparedStatement insert;
private final PreparedStatement insertPart;
@@ -113,7 +117,7 @@ public class CassandraBlobsDAO {
}
private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) {
- return FluentFutureStream.<Pair<Integer, Void>> of(
+ return FluentFutureStream.of(
dataChunker.chunk(data, configuration.getBlobPartSize())
.map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
.thenApply(partId -> Pair.of(pair.getKey(), partId))))
@@ -146,23 +150,29 @@ public class CassandraBlobsDAO {
return cassandraAsyncExecutor.executeSingleRow(
select.bind()
.setString(BlobTable.ID, blobId.getId()))
- .thenCompose(this::toDataParts)
+ .thenCompose(row -> toDataParts(row, blobId))
.thenApply(this::concatenateDataParts);
}
- private CompletableFuture<Stream<Optional<Row>>> toDataParts(Optional<Row> blobRowOptional) {
+ private CompletableFuture<Stream<BlobPart>> toDataParts(Optional<Row> blobRowOptional, BlobId blobId) {
return blobRowOptional.map(blobRow -> {
- BlobId blobId = BlobId.from(blobRow.getString(BlobTable.ID));
int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK);
return FluentFutureStream.of(
IntStream.range(0, numOfChunk)
.mapToObj(position -> readPart(blobId, position)))
.completableFuture();
- }).orElse(CompletableFuture.completedFuture(Stream.empty()));
- }
-
- private byte[] concatenateDataParts(Stream<Optional<Row>> rows) {
- ImmutableList<byte[]> parts = rows.flatMap(OptionalConverter::toStream)
+ }).orElseGet(() -> {
+ LOGGER.warn("Could not retrieve blob metadata for {}", blobId);
+ return CompletableFuture.completedFuture(Stream.empty());
+ });
+ }
+
+ private byte[] concatenateDataParts(Stream<BlobPart> blobParts) {
+ ImmutableList<byte[]> parts = blobParts
+ .map(blobPart -> OptionalConverter.ifEmpty(
+ blobPart.row,
+ () -> LOGGER.warn("Missing blob part for blobId {} and position {}", blobPart.blobId, blobPart.position)))
+ .flatMap(OptionalConverter::toStream)
.map(this::rowToData)
.collect(Guavate.toImmutableList());
@@ -175,10 +185,25 @@ public class CassandraBlobsDAO {
return data;
}
- private CompletableFuture<Optional<Row>> readPart(BlobId blobId, int position) {
+ private CompletableFuture<BlobPart> readPart(BlobId blobId, int position) {
return cassandraAsyncExecutor.executeSingleRow(
selectPart.bind()
.setString(BlobTable.ID, blobId.getId())
- .setInt(BlobParts.CHUNK_NUMBER, position));
+ .setInt(BlobParts.CHUNK_NUMBER, position))
+ .thenApply(row -> new BlobPart(blobId, position, row));
+ }
+
+ private static class BlobPart {
+ private final BlobId blobId;
+ private final int position;
+ private final Optional<Row> row;
+
+ public BlobPart(BlobId blobId, int position, Optional<Row> row) {
+ Preconditions.checkNotNull(blobId);
+ Preconditions.checkArgument(position >= 0, "position need to be positive");
+ this.blobId = blobId;
+ this.position = position;
+ this.row = row;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org