You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by GitBox <gi...@apache.org> on 2021/08/11 08:12:15 UTC

[GitHub] [james-project] vttranlina commented on a change in pull request #574: JAMES-3544 Distributed implementation of upload repository

vttranlina commented on a change in pull request #574:
URL: https://github.com/apache/james-project/pull/574#discussion_r686601915



##########
File path: server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
##########
@@ -0,0 +1,68 @@
+package org.apache.james.jmap.cassandra.upload;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
+import java.io.InputStream;
+
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.Upload;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.jmap.api.model.UploadMetaData;
+import org.apache.james.jmap.api.model.UploadNotFoundException;
+import org.apache.james.jmap.api.upload.UploadRepository;
+import org.apache.james.mailbox.model.ContentType;
+import org.reactivestreams.Publisher;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.io.CountingInputStream;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class CassandraUploadRepository implements UploadRepository {
+    private final UploadDAO uploadDAO;
+    private final BlobStore blobStore;
+    private final BucketNameGenerator bucketNameGenerator;
+
+    public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, BucketNameGenerator bucketNameGenerator) {
+        this.uploadDAO = uploadDAO;
+        this.blobStore = blobStore;
+        this.bucketNameGenerator = bucketNameGenerator;
+    }
+
+    @Override
+    public Publisher<UploadId> upload(InputStream data, ContentType contentType, Username user) {
+        UploadId uploadId = generateId();
+        UploadBucketName uploadBucketName = bucketNameGenerator.current();
+        BucketName bucketName = uploadBucketName.asBucketName();
+
+        return Mono.fromCallable(() -> new CountingInputStream(data))
+            .flatMap(countingInputStream -> Mono.from(blobStore.save(bucketName, countingInputStream, LOW_COST))
+                .map(blobId -> new UploadDAO.UploadRepresentation(uploadId, bucketName, blobId, contentType, countingInputStream.getCount(), user))
+                .flatMap(upload -> uploadDAO.save(upload).thenReturn(upload.getId())));
+    }
+
+    @Override
+    public Publisher<Upload> retrieve(UploadId id, Username user) {
+        return uploadDAO.retrieve(id)
+            .filter(upload -> upload.getUser().equals(user))
+            .map(upload -> Upload.from(
+                UploadMetaData.from(id, upload.getContentType(), upload.getSize(), upload.getBlobId()),
+                () -> blobStore.read(upload.getBucketName(), upload.getBlobId(), LOW_COST)))
+            .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
+    }
+
+    public Mono<Void> purge() {
+        return Flux.from(blobStore.listBuckets())
+            .<UploadBucketName>handle((bucketName, sink) -> UploadBucketName.ofBucket(bucketName).ifPresentOrElse(sink::next, sink::complete))
+            .filter(bucketNameGenerator.evictionPredicate())
+            .concatMap(bucket -> blobStore.deleteBucket(bucket.asBucketName()))

Review comment:
       When I run this method, I got the exception when `deleteBucket`. 
   (Start the second element of the collection. It throws `java.util.ConcurrentModificationException`, 
   No exception with the head element). 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org