You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2018/12/20 13:39:38 UTC
[1/5] james-project git commit: JAMES-2623 include logback for test
logs
Repository: james-project
Updated Branches:
refs/heads/master 6b2880416 -> 110e9a2a2
JAMES-2623 include logback for test logs
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/a3de0565
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/a3de0565
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/a3de0565
Branch: refs/heads/master
Commit: a3de05651b96fb000f3f9a011ad87d30ffe4d977
Parents: 2ac1fe6
Author: Matthieu Baechler <ma...@apache.org>
Authored: Fri Dec 7 16:32:18 2018 +0100
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Thu Dec 20 14:38:35 2018 +0100
----------------------------------------------------------------------
server/blob/blob-cassandra/pom.xml | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/a3de0565/server/blob/blob-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/pom.xml b/server/blob/blob-cassandra/pom.xml
index 42a77ef..63a3414 100644
--- a/server/blob/blob-cassandra/pom.xml
+++ b/server/blob/blob-cassandra/pom.xml
@@ -58,6 +58,11 @@
<artifactId>james-server-util</artifactId>
</dependency>
<dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[3/5] james-project git commit: JAMES-2623 demonstrate that Cassandra
is not able to load big blobs because of excessive //ism
Posted by ad...@apache.org.
JAMES-2623 demonstrate that Cassandra is not able to load big blobs because of excessive //ism
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/79e46ad5
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/79e46ad5
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/79e46ad5
Branch: refs/heads/master
Commit: 79e46ad51e2d7310097591a30ab0760772c9ff43
Parents: a3de056
Author: Matthieu Baechler <ma...@apache.org>
Authored: Mon Dec 10 18:50:36 2018 +0100
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Thu Dec 20 14:38:43 2018 +0100
----------------------------------------------------------------------
.../apache/james/blob/cassandra/CassandraBlobsDAOTest.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/79e46ad5/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
index b7d7980..86eeb86 100644
--- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
+++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java
@@ -21,6 +21,7 @@ package org.apache.james.blob.cassandra;
import static org.assertj.core.api.Assertions.assertThat;
+import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import org.apache.james.backends.cassandra.CassandraCluster;
@@ -31,6 +32,7 @@ import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.MetricableBlobStore;
import org.apache.james.blob.api.MetricableBlobStoreContract;
+import org.apache.james.util.ZeroedInputStream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -77,4 +79,11 @@ public class CassandraBlobsDAOTest implements MetricableBlobStoreContract {
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString);
}
+ @Test
+ void blobStoreShouldSupport100MBBlob() {
+ BlobId blobId = testee.save(new ZeroedInputStream(100_000_000)).join();
+ InputStream bytes = testee.read(blobId);
+ assertThat(bytes).hasSameContentAs(new ZeroedInputStream(100_000_000));
+ }
+
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[4/5] james-project git commit: JAMES-2623 implement cassandra
blobstore with Reactor
Posted by ad...@apache.org.
JAMES-2623 implement cassandra blobstore with Reactor
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/a4875cfa
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/a4875cfa
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/a4875cfa
Branch: refs/heads/master
Commit: a4875cfa96fdce5ca044e1d6e8adc683658c7b2f
Parents: 79e46ad
Author: Matthieu Baechler <ma...@linagora.com>
Authored: Fri Dec 7 14:52:21 2018 +0100
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Thu Dec 20 14:38:47 2018 +0100
----------------------------------------------------------------------
.../james/blob/cassandra/CassandraBlobsDAO.java | 184 ++++++++-----------
.../cassandra/utils/PipedStreamSubscriber.java | 90 +++++++++
.../src/resources/test/logback-test.xml | 28 +++
3 files changed, 198 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
index e079176..01c6e8d 100644
--- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java
@@ -24,15 +24,12 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
-import java.io.IOException;
import java.io.InputStream;
+import java.io.PipedInputStream;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.Pipe;
-import java.util.Optional;
+import java.util.Comparator;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.stream.IntStream;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
@@ -47,24 +44,23 @@ import org.apache.james.blob.api.HashBlobId;
import org.apache.james.blob.api.ObjectStoreException;
import org.apache.james.blob.cassandra.BlobTable.BlobParts;
import org.apache.james.blob.cassandra.utils.DataChunker;
-import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.james.blob.cassandra.utils.PipedStreamSubscriber;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.consumers.ConsumerChainer;
-import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class CassandraBlobsDAO implements BlobStore {
- private static final Logger LOGGER = LoggerFactory.getLogger(CassandraBlobsDAO.class);
+
+ private static final int PREFETCH = 16;
+ private static final int MAX_CONCURRENCY = 2;
private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final PreparedStatement insert;
private final PreparedStatement insertPart;
@@ -122,71 +118,70 @@ public class CassandraBlobsDAO implements BlobStore {
public CompletableFuture<BlobId> save(byte[] data) {
Preconditions.checkNotNull(data);
- HashBlobId blobId = blobIdFactory.forPayload(data);
+ return saveAsMono(data).toFuture();
+ }
+
+ private Mono<BlobId> saveAsMono(byte[] data) {
+ BlobId blobId = blobIdFactory.forPayload(data);
return saveBlobParts(data, blobId)
- .thenCompose(numberOfChunk -> saveBlobPartsReferences(blobId, numberOfChunk))
- .thenApply(any -> blobId);
+ .flatMap(numberOfChunk -> saveBlobPartsReferences(blobId, numberOfChunk));
}
- private CompletableFuture<Integer> saveBlobParts(byte[] data, HashBlobId blobId) {
- return FluentFutureStream.of(
- dataChunker.chunk(data, configuration.getBlobPartSize())
- .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
- .thenApply(partId -> Pair.of(pair.getKey(), partId))))
- .completableFuture()
- .thenApply(stream ->
- getLastOfStream(stream)
- .map(numOfChunkAndPartId -> numOfChunkAndPartId.getLeft() + 1)
- .orElse(0));
+ private Mono<Integer> saveBlobParts(byte[] data, BlobId blobId) {
+ Stream<Pair<Integer, ByteBuffer>> chunks = dataChunker.chunk(data, configuration.getBlobPartSize());
+ return Flux.fromStream(chunks)
+ .publishOn(Schedulers.elastic(), PREFETCH)
+ .flatMap(pair -> writePart(pair.getValue(), blobId, getChunkNum(pair)))
+ .collect(Collectors.maxBy(Comparator.comparingInt(x -> x)))
+ .flatMap(Mono::justOrEmpty)
+ .map(this::numToCount)
+ .defaultIfEmpty(0);
}
- private static <T> Optional<T> getLastOfStream(Stream<T> stream) {
- return stream.reduce((first, second) -> second);
+ private int numToCount(int number) {
+ return number + 1;
}
- private CompletableFuture<Void> writePart(ByteBuffer data, HashBlobId blobId, int position) {
- return cassandraAsyncExecutor.executeVoid(
+ private Integer getChunkNum(Pair<Integer, ByteBuffer> pair) {
+ return pair.getKey();
+ }
+
+ private Mono<Integer> writePart(ByteBuffer data, BlobId blobId, int position) {
+ return cassandraAsyncExecutor.executeVoidReactor(
insertPart.bind()
.setString(BlobTable.ID, blobId.asString())
.setInt(BlobParts.CHUNK_NUMBER, position)
- .setBytes(BlobParts.DATA, data));
+ .setBytes(BlobParts.DATA, data))
+ .then(Mono.just(position));
}
- private CompletableFuture<Void> saveBlobPartsReferences(HashBlobId blobId, int numberOfChunk) {
- return cassandraAsyncExecutor.executeVoid(insert.bind()
- .setString(BlobTable.ID, blobId.asString())
- .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk));
+ private Mono<BlobId> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) {
+ return cassandraAsyncExecutor.executeVoidReactor(
+ insert.bind()
+ .setString(BlobTable.ID, blobId.asString())
+ .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk))
+ .then(Mono.just(blobId));
}
@Override
public CompletableFuture<byte[]> readBytes(BlobId blobId) {
- CompletableFuture<Row> futureRow = cassandraAsyncExecutor
- .executeSingleRow(
- select.bind()
- .setString(BlobTable.ID, blobId.asString()))
- .thenApply(x -> x.orElseThrow(() -> new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId))));
- return toDataParts(futureRow.join(), blobId)
- .thenApply(this::concatenateDataParts);
- }
-
- private CompletableFuture<Stream<BlobPart>> toDataParts(Row blobRow, BlobId blobId) {
- int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK);
- return FluentFutureStream.of(
- IntStream.range(0, numOfChunk)
- .mapToObj(position -> readPart(blobId, position)))
- .completableFuture();
+ try {
+ return readBlobParts(blobId)
+ .collectList()
+ .map(parts -> Bytes.concat(parts.toArray(new byte[0][])))
+ .toFuture();
+ } catch (ObjectStoreException e) {
+ CompletableFuture<byte[]> error = new CompletableFuture<>();
+ error.completeExceptionally(e);
+ return error;
+ }
}
- private byte[] concatenateDataParts(Stream<BlobPart> blobParts) {
- ImmutableList<byte[]> parts = blobParts
- .map(blobPart -> OptionalUtils.executeIfEmpty(
- blobPart.row,
- () -> LOGGER.warn("Missing blob part for blobId {} and position {}", blobPart.blobId, blobPart.position)))
- .flatMap(OptionalUtils::toStream)
- .map(this::rowToData)
- .collect(Guavate.toImmutableList());
-
- return Bytes.concat(parts.toArray(new byte[parts.size()][]));
+ private Mono<Integer> selectRowCount(BlobId blobId) {
+ return cassandraAsyncExecutor.executeSingleRowReactor(
+ select.bind()
+ .setString(BlobTable.ID, blobId.asString()))
+ .map(row -> row.getInt(BlobTable.NUMBER_OF_CHUNK));
}
private byte[] rowToData(Row row) {
@@ -195,60 +190,41 @@ public class CassandraBlobsDAO implements BlobStore {
return data;
}
- private CompletableFuture<BlobPart> readPart(BlobId blobId, int position) {
- return cassandraAsyncExecutor.executeSingleRow(
+ private Mono<byte[]> readPart(BlobId blobId, int position) {
+ return cassandraAsyncExecutor.executeSingleRowReactor(
selectPart.bind()
.setString(BlobTable.ID, blobId.asString())
.setInt(BlobParts.CHUNK_NUMBER, position))
- .thenApply(row -> new BlobPart(blobId, position, row));
- }
-
- private static class BlobPart {
- private final BlobId blobId;
- private final int position;
- private final Optional<Row> row;
-
- public BlobPart(BlobId blobId, int position, Optional<Row> row) {
- Preconditions.checkNotNull(blobId);
- Preconditions.checkArgument(position >= 0, "position need to be positive");
- this.blobId = blobId;
- this.position = position;
- this.row = row;
- }
+ .map(this::rowToData)
+ .switchIfEmpty(Mono.error(new IllegalStateException(
+ String.format("Missing blob part for blobId %s and position %d", blobId, position))));
}
@Override
public InputStream read(BlobId blobId) {
- try {
- Pipe pipe = Pipe.open();
- ConsumerChainer<ByteBuffer> consumer = Throwing.consumer(
- bytes -> {
- try (Pipe.SinkChannel sink = pipe.sink()) {
- sink.write(bytes);
- }
- }
- );
- readBytes(blobId)
- .thenApply(ByteBuffer::wrap)
- .thenAccept(consumer.sneakyThrow());
- return Channels.newInputStream(pipe.source());
- } catch (CompletionException e) {
- if (e.getCause() instanceof ObjectStoreException) {
- throw (ObjectStoreException)(e.getCause());
- }
- throw new RuntimeException(e);
- } catch (IOException cause) {
- throw new ObjectStoreException(
- "Failed to convert CompletableFuture<byte[]> to InputStream",
- cause);
- }
+ PipedInputStream pipedInputStream = new PipedInputStream();
+ readBlobParts(blobId)
+ .subscribe(new PipedStreamSubscriber(pipedInputStream));
+ return pipedInputStream;
+ }
+
+ private Flux<byte[]> readBlobParts(BlobId blobId) {
+ Integer rowCount = selectRowCount(blobId)
+ .publishOn(Schedulers.elastic())
+ .switchIfEmpty(Mono.error(
+ new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId))))
+ .block();
+ return Flux.range(0, rowCount)
+ .publishOn(Schedulers.elastic(), PREFETCH)
+ .flatMapSequential(partIndex -> readPart(blobId, partIndex), MAX_CONCURRENCY, PREFETCH);
}
@Override
public CompletableFuture<BlobId> save(InputStream data) {
Preconditions.checkNotNull(data);
- return CompletableFuture
- .supplyAsync(Throwing.supplier(() -> IOUtils.toByteArray(data)).sneakyThrow())
- .thenCompose(this::save);
+ return Mono.fromSupplier(Throwing.supplier(() -> IOUtils.toByteArray(data)).sneakyThrow())
+ .publishOn(Schedulers.elastic())
+ .flatMap(this::saveAsMono)
+ .toFuture();
}
}
http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
new file mode 100644
index 0000000..801d4c9
--- /dev/null
+++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/utils/PipedStreamSubscriber.java
@@ -0,0 +1,90 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.blob.cassandra.utils;
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.UncheckedIOException;
+
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import reactor.core.publisher.BaseSubscriber;
+
+public class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final PipedInputStream in;
+ private PipedOutputStream out;
+
+ public PipedStreamSubscriber(PipedInputStream in) {
+ Preconditions.checkNotNull(in, "The input stream must not be null");
+ this.in = in;
+ }
+
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+ super.hookOnSubscribe(subscription);
+ try {
+ this.out = new PipedOutputStream(in);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ protected void hookOnNext(byte[] payload) {
+ try {
+ out.write(payload);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ close();
+ }
+
+ @Override
+ protected void hookOnError(Throwable error) {
+ logger.error("Failure processing stream", error);
+ close();
+ }
+
+ @Override
+ protected void hookOnCancel() {
+ close();
+ }
+
+ private void close() {
+ try {
+ if (out != null) {
+ out.close();
+ }
+ } catch (IOException ignored) {
+ //ignored
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/a4875cfa/server/blob/blob-cassandra/src/resources/test/logback-test.xml
----------------------------------------------------------------------
diff --git a/server/blob/blob-cassandra/src/resources/test/logback-test.xml b/server/blob/blob-cassandra/src/resources/test/logback-test.xml
new file mode 100644
index 0000000..b02f0d8
--- /dev/null
+++ b/server/blob/blob-cassandra/src/resources/test/logback-test.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+
+ <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
+ <resetJUL>true</resetJUL>
+ </contextListener>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%-5level] %logger{15} - %msg%n%rEx</pattern>
+ <immediateFlush>false</immediateFlush>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="CONSOLE" />
+ </root>
+
+
+ <logger name="org.apache.james" level="WARN" >
+ <appender-ref ref="CONSOLE" />
+ </logger>
+
+ <logger name="org.apache.james.backends.cassandra.DockerCassandraRule" level="WARN" >
+ <appender-ref ref="CONSOLE" />
+ </logger>
+
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[5/5] james-project git commit: Merge branch
'reactor-for-cassandra-blob-dao-2'
Posted by ad...@apache.org.
Merge branch 'reactor-for-cassandra-blob-dao-2'
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/110e9a2a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/110e9a2a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/110e9a2a
Branch: refs/heads/master
Commit: 110e9a2a20246f0bddb32f183d5877fdca8d7a41
Parents: 6b28804 a4875cf
Author: Antoine Duprat <ad...@linagora.com>
Authored: Thu Dec 20 14:39:11 2018 +0100
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Thu Dec 20 14:39:11 2018 +0100
----------------------------------------------------------------------
server/blob/blob-cassandra/pom.xml | 5 +
.../james/blob/cassandra/CassandraBlobsDAO.java | 184 ++++++++-----------
.../cassandra/utils/PipedStreamSubscriber.java | 90 +++++++++
.../src/resources/test/logback-test.xml | 28 +++
.../blob/cassandra/CassandraBlobsDAOTest.java | 9 +
5 files changed, 212 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org
[2/5] james-project git commit: JAMES-2624 use
CompletableFutureUtil.exceptionallyFuture instead of a hack
Posted by ad...@apache.org.
JAMES-2624 use CompletableFutureUtil.exceptionallyFuture instead of a hack
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/2ac1fe6a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/2ac1fe6a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/2ac1fe6a
Branch: refs/heads/master
Commit: 2ac1fe6a40e1292e0f4dc0017ef0d9a2f4db05f1
Parents: 544924a
Author: Matthieu Baechler <ma...@apache.org>
Authored: Fri Dec 14 14:38:01 2018 +0100
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Thu Dec 20 14:38:35 2018 +0100
----------------------------------------------------------------------
.../java/org/apache/james/blob/memory/MemoryBlobStore.java | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/2ac1fe6a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
----------------------------------------------------------------------
diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
index 20c1fef..c76b975 100644
--- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
+++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java
@@ -25,12 +25,12 @@ import java.io.InputStream;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.ObjectStoreException;
+import org.apache.james.util.CompletableFutureUtil;
import com.google.common.base.Preconditions;
@@ -69,10 +69,7 @@ public class MemoryBlobStore implements BlobStore {
try {
return CompletableFuture.completedFuture(retrieveStoredValue(blobId));
} catch (ObjectStoreException e) {
- Supplier<byte[]> throwing = () -> {
- throw e;
- };
- return CompletableFuture.supplyAsync(throwing);
+ return CompletableFutureUtil.exceptionallyFuture(e);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org