You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2018/12/20 13:39:41 UTC

[4/5] james-project git commit: JAMES-2623 implement cassandra blobstore with Reactor

JAMES-2623 implement cassandra blobstore with Reactor


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/a4875cfa
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/a4875cfa
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/a4875cfa

Branch: refs/heads/master
Commit: a4875cfa96fdce5ca044e1d6e8adc683658c7b2f
Parents: 79e46ad
Author: Matthieu Baechler <ma...@linagora.com>
Authored: Fri Dec 7 14:52:21 2018 +0100
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Thu Dec 20 14:38:47 2018 +0100

----------------------------------------------------------------------
 .../james/blob/cassandra/CassandraBlobsDAO.java | 184 ++++++++-----------
 .../cassandra/utils/PipedStreamSubscriber.java  |  90 +++++++++
 .../src/resources/test/logback-test.xml         |  28 +++
 3 files changed, 198 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index e079176..01c6e8d 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -24,15 +24,12 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
-import java.io.IOException;
 import java.io.InputStream;
+import java.io.PipedInputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.Pipe;
-import java.util.Optional;
+import java.util.Comparator;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
@@ -47,24 +44,23 @@ import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.api.ObjectStoreException;
 import org.apache.james.blob.cassandra.BlobTable.BlobParts;
 import org.apache.james.blob.cassandra.utils.DataChunker;
-import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.james.blob.cassandra.utils.PipedStreamSubscriber;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.consumers.ConsumerChainer;
-import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraBlobsDAO implements BlobStore {
-    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobsDAO.class);
+
+    private static final int PREFETCH = 16;
+    private static final int MAX_CONCURRENCY = 2;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insert;
     private final PreparedStatement insertPart;
@@ -122,71 +118,70 @@ public class CassandraBlobsDAO implements BlobStore {
     public CompletableFuture<BlobId> save(byte[] data) {
         Preconditions.checkNotNull(data);
 
-        HashBlobId blobId = blobIdFactory.forPayload(data);
+        return saveAsMono(data).toFuture();
+    }
+
+    private Mono<BlobId> saveAsMono(byte[] data) {
+        BlobId blobId = blobIdFactory.forPayload(data);
         return saveBlobParts(data, blobId)
-            .thenCompose(numberOfChunk -> saveBlobPartsReferences(blobId, numberOfChunk))
-            .thenApply(any -> blobId);
+            .flatMap(numberOfChunk -> saveBlobPartsReferences(blobId, numberOfChunk));
     }
 
-    private CompletableFuture<Integer> saveBlobParts(byte[] data, HashBlobId blobId) {
-        return FluentFutureStream.of(
-            dataChunker.chunk(data, configuration.getBlobPartSize())
-                .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
-                    .thenApply(partId -> Pair.of(pair.getKey(), partId))))
-            .completableFuture()
-            .thenApply(stream ->
-                getLastOfStream(stream)
-                    .map(numOfChunkAndPartId -> numOfChunkAndPartId.getLeft() + 1)
-                    .orElse(0));
+    private Mono<Integer> saveBlobParts(byte[] data, BlobId blobId) {
+        Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize());
+        return Flux.fromStream(chunks)
+            .publishOn(Schedulers.elastic(), PREFETCH)
+            .flatMap(pair -> writePart(pair.getValue(), blobId, getChunkNum(pair)))
+            .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
+            .flatMap(Mono::justOrEmpty)
+            .map(this::numToCount)
+            .defaultIfEmpty(0);
     }
 
-    private static <T> Optional<T> getLastOfStream(Stream<T> stream) {
-        return stream.reduce((first, second) -> second);
+    private int numToCount(int number) {
+        return number + 1;
     }
 
-    private CompletableFuture<Void> writePart(ByteBuffer data, HashBlobId blobId, int position) {
-        return cassandraAsyncExecutor.executeVoid(
+    private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) {
+        return pair.getKey();
+    }
+
+    private Mono<Integer> writePart(ByteBuffer data, BlobId blobId, int position) {
+        return cassandraAsyncExecutor.executeVoidReactor(
             insertPart.bind()
                 .setString(BlobTable.ID, blobId.asString())
                 .setInt(BlobParts.CHUNK_NUMBER, position)
-                .setBytes(BlobParts.DATA, data));
+                .setBytes(BlobParts.DATA, data))
+            .then(Mono.just(position));
     }
 
-    private CompletableFuture<Void> saveBlobPartsReferences(HashBlobId blobId, int numberOfChunk) {
-        return cassandraAsyncExecutor.executeVoid(insert.bind()
-            .setString(BlobTable.ID, blobId.asString())
-            .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk));
+    private Mono<BlobId> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) {
+        return cassandraAsyncExecutor.executeVoidReactor(
+            insert.bind()
+                .setString(BlobTable.ID, blobId.asString())
+                .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk))
+            .then(Mono.just(blobId));
     }
 
     @Override
     public CompletableFuture<byte[]> readBytes(BlobId blobId) {
-        CompletableFuture<Row> futureRow = cassandraAsyncExecutor
-            .executeSingleRow(
-                select.bind()
-                    .setString(BlobTable.ID, blobId.asString()))
-            .thenApply(x -> x.orElseThrow(() -> new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId))));
-        return toDataParts(futureRow.join(), blobId)
-            .thenApply(this::concatenateDataParts);
-    }
-
-    private CompletableFuture<Stream<BlobPart>> toDataParts(Row blobRow, BlobId blobId) {
-        int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK);
-        return FluentFutureStream.of(
-            IntStream.range(0, numOfChunk)
-                .mapToObj(position -> readPart(blobId, position)))
-            .completableFuture();
+        try {
+            return readBlobParts(blobId)
+                .collectList()
+                .map(parts -> Bytes.concat(parts.toArray(new byte[0][])))
+                .toFuture();
+        } catch (ObjectStoreException e) {
+            CompletableFuture<byte[]> error = new CompletableFuture<>();
+            error.completeExceptionally(e);
+            return error;
+        }
     }
 
-    private byte[] concatenateDataParts(Stream<BlobPart> blobParts) {
-        ImmutableList<byte[]> parts = blobParts
-            .map(blobPart -> OptionalUtils.executeIfEmpty(
-                blobPart.row,
-                () -> LOGGER.warn("Missing blob part for blobId {} and position {}", blobPart.blobId, blobPart.position)))
-            .flatMap(OptionalUtils::toStream)
-            .map(this::rowToData)
-            .collect(Guavate.toImmutableList());
-
-        return Bytes.concat(parts.toArray(new byte[parts.size()][]));
+    private Mono<Integer> selectRowCount(BlobId blobId) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
+                select.bind()
+                    .setString(BlobTable.ID, blobId.asString()))
+            .map(row -> row.getInt(BlobTable.NUMBER_OF_CHUNK));
     }
 
     private byte[] rowToData(Row row) {
@@ -195,60 +190,41 @@ public class CassandraBlobsDAO implements BlobStore {
         return data;
     }
 
-    private CompletableFuture<BlobPart> readPart(BlobId blobId, int position) {
-        return cassandraAsyncExecutor.executeSingleRow(
+    private Mono<byte[]> readPart(BlobId blobId, int position) {
+        return cassandraAsyncExecutor.executeSingleRowReactor(
             selectPart.bind()
                 .setString(BlobTable.ID, blobId.asString())
                 .setInt(BlobParts.CHUNK_NUMBER, position))
-            .thenApply(row -> new BlobPart(blobId, position, row));
-    }
-
-    private static class BlobPart {
-        private final BlobId blobId;
-        private final int position;
-        private final Optional<Row> row;
-
-        public BlobPart(BlobId blobId, int position, Optional<Row> row) {
-            Preconditions.checkNotNull(blobId);
-            Preconditions.checkArgument(position >= 0, "position need to be positive");
-            this.blobId = blobId;
-            this.position = position;
-            this.row = row;
-        }
+            .map(this::rowToData)
+            .switchIfEmpty(Mono.error(new IllegalStateException(
+                String.format("Missing blob part for blobId %s and position %d", blobId, position))));
     }
 
     @Override
     public InputStream read(BlobId blobId) {
-        try {
-            Pipe pipe = Pipe.open();
-            ConsumerChainer<ByteBuffer> consumer = Throwing.consumer(
-                bytes -> {
-                    try (Pipe.SinkChannel sink = pipe.sink()) {
-                        sink.write(bytes);
-                    }
-                }
-            );
-            readBytes(blobId)
-                .thenApply(ByteBuffer::wrap)
-                .thenAccept(consumer.sneakyThrow());
-            return Channels.newInputStream(pipe.source());
-        } catch (CompletionException e) {
-            if (e.getCause() instanceof ObjectStoreException) {
-                throw (ObjectStoreException)(e.getCause());
-            }
-            throw new RuntimeException(e);
-        } catch (IOException cause) {
-            throw new ObjectStoreException(
-                "Failed to convert CompletableFuture<byte[]> to InputStream",
-                cause);
-        }
+        PipedInputStream pipedInputStream = new PipedInputStream();
+        readBlobParts(blobId)
+            .subscribe(new PipedStreamSubscriber(pipedInputStream));
+        return pipedInputStream;
+    }
+
+    private Flux<byte[]> readBlobParts(BlobId blobId) {
+        Integer rowCount = selectRowCount(blobId)
+            .publishOn(Schedulers.elastic())
+            .switchIfEmpty(Mono.error(
+                new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId))))
+            .block();
+        return Flux.range(0, rowCount)
+            .publishOn(Schedulers.elastic(), PREFETCH)
+            .flatMapSequential(partIndex -> readPart(blobId, partIndex), MAX_CONCURRENCY, PREFETCH);
     }
 
     @Override
     public CompletableFuture<BlobId> save(InputStream data) {
         Preconditions.checkNotNull(data);
-        return CompletableFuture
-            .supplyAsync(Throwing.supplier(() -> IOUtils.toByteArray(data)).sneakyThrow())
-            .thenCompose(this::save);
+        return Mono.fromSupplier(Throwing.supplier(() -> IOUtils.toByteArray(data)).sneakyThrow())
+            .publishOn(Schedulers.elastic())
+            .flatMap(this::saveAsMono)
+            .toFuture();
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
new file mode 100644
index 0000000..801d4c9
--- /dev/null
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.cassandra.utils;
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.UncheckedIOException;
+
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import reactor.core.publisher.BaseSubscriber;
+
+public class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private final PipedInputStream in;
+    private PipedOutputStream out;
+
+    public PipedStreamSubscriber(PipedInputStream in) {
+        Preconditions.checkNotNull(in, "The input stream must not be null");
+        this.in = in;
+    }
+
+    @Override
+    protected void hookOnSubscribe(Subscription subscription) {
+        super.hookOnSubscribe(subscription);
+        try {
+            this.out = new PipedOutputStream(in);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    protected void hookOnNext(byte[] payload) {
+        try {
+            out.write(payload);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    protected void hookOnComplete() {
+        close();
+    }
+
+    @Override
+    protected void hookOnError(Throwable error) {
+        logger.error("Failure processing stream", error);
+        close();
+    }
+
+    @Override
+    protected void hookOnCancel() {
+        close();
+    }
+
+    private void close() {
+        try {
+            if (out != null) {
+                out.close();
+            }
+        } catch (IOException ignored) {
+            //ignored
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/resources/test/logback-test.xml
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/resources/test/logback-test.xml b/server/blob/blob-cassandra/src/resources/test/logback-test.xml
new file mode 100644
index 0000000..b02f0d8
--- /dev/null
+++ b/server/blob/blob-cassandra/src/resources/test/logback-test.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+        <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
+                <resetJUL>true</resetJUL>
+        </contextListener>
+
+        <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+                <encoder>
+                        <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern>
+                        <immediateFlush>false</immediateFlush>
+                </encoder>
+        </appender>
+
+        <root level="WARN">
+                <appender-ref ref="CONSOLE" />
+        </root>
+
+
+        <logger name="org.apache.james" level="WARN" >
+                <appender-ref ref="CONSOLE" />
+        </logger>
+
+        <logger name="org.apache.james.backends.cassandra.DockerCassandraRule" level="WARN" >
+                <appender-ref ref="CONSOLE" />
+        </logger>
+
+</configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org