You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2017/07/10 17:54:21 UTC

[09/41] james-project git commit: JAMES-2082 Implement blob storage

JAMES-2082 Implement blob storage


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ee45bd4e
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ee45bd4e
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ee45bd4e

Branch: refs/heads/master
Commit: ee45bd4e3631125cc03c8527e8b2b63b8a6b343e
Parents: 67aa191
Author: quynhn <qn...@linagora.com>
Authored: Mon Jul 3 18:07:55 2017 +0700
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Mon Jul 10 14:23:55 2017 +0200

----------------------------------------------------------------------
 .../apache/james/mailbox/cassandra/BlobId.java  |  73 ++++++++
 .../apache/james/mailbox/cassandra/PartId.java  |  72 ++++++++
 .../cassandra/mail/CassandraBlobsDAO.java       | 177 +++++++++++++++++++
 .../cassandra/mail/utils/DataChunker.java       |  52 ++++++
 .../cassandra/modules/CassandraBlobModule.java  |  64 +++++++
 .../mailbox/cassandra/table/BlobTable.java      |  33 ++++
 .../table/CassandraMessageV2Table.java          |  67 +++++++
 .../james/mailbox/cassandra/ids/BlobIdTest.java |  83 +++++++++
 .../james/mailbox/cassandra/ids/PartIdTest.java |  87 +++++++++
 .../cassandra/mail/CassandraBlobsDAOTest.java   | 126 +++++++++++++
 .../cassandra/mail/utils/DataChunkerTest.java   | 139 +++++++++++++++
 11 files changed, 973 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java
new file mode 100644
index 0000000..50601b6
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java
@@ -0,0 +1,73 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+import org.apache.commons.codec.digest.DigestUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+public class BlobId {
+    public static BlobId forPayload(byte[] payload) {
+        Preconditions.checkArgument(payload != null);
+        return new BlobId(DigestUtils.sha1Hex(payload));
+    }
+
+    public static BlobId from(String id) {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(id));
+        return new BlobId(id);
+    }
+
+    private final String id;
+
+    @VisibleForTesting
+    BlobId(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public final boolean equals(Object obj) {
+        if (obj instanceof BlobId) {
+            BlobId other = (BlobId) obj;
+            return Objects.equal(id, other.id);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hashCode(id);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects
+            .toStringHelper(this)
+            .add("id", id)
+            .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java
new file mode 100644
index 0000000..421be7f
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+public class PartId {
+    public static PartId create(BlobId blobId, int position) {
+        Preconditions.checkNotNull(blobId);
+        Preconditions.checkArgument(position >= 0, "Position needs to be positive");
+        return new PartId(blobId.getId() + "-" + position);
+    }
+
+    public static PartId from(String id) {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(id));
+        return new PartId(id);
+    }
+
+    private final String id;
+
+    @VisibleForTesting
+    PartId(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public final boolean equals(Object obj) {
+        if (obj instanceof PartId) {
+            PartId other = (PartId) obj;
+            return Objects.equal(id, other.id);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hashCode(id);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects
+            .toStringHelper(this)
+            .add("id", id)
+            .toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
new file mode 100644
index 0000000..0d20e88
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
@@ -0,0 +1,177 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.mailbox.cassandra.BlobId;
+import org.apache.james.mailbox.cassandra.PartId;
+import org.apache.james.mailbox.cassandra.mail.utils.DataChunker;
+import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.OptionalConverter;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Bytes;
+
+
+public class CassandraBlobsDAO {
+
+    public static final int CHUNK_SIZE = 1024 * 100;
+    private final CassandraAsyncExecutor cassandraAsyncExecutor;
+    private final PreparedStatement insert;
+    private final PreparedStatement insertPart;
+    private final PreparedStatement select;
+    private final PreparedStatement selectPart;
+    private final DataChunker dataChunker;
+
+    @Inject
+    public CassandraBlobsDAO(Session session) {
+        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+        this.dataChunker = new DataChunker();
+        this.insert = prepareInsert(session);
+        this.select = prepareSelect(session);
+
+        this.insertPart = prepareInsertPart(session);
+        this.selectPart = prepareSelectPart(session);
+    }
+
+    private PreparedStatement prepareSelect(Session session) {
+        return session.prepare(select()
+            .from(Blobs.TABLE_NAME)
+            .where(eq(Blobs.ID, bindMarker(Blobs.ID))));
+    }
+
+    private PreparedStatement prepareSelectPart(Session session) {
+        return session.prepare(select()
+            .from(BlobParts.TABLE_NAME)
+            .where(eq(BlobParts.ID, bindMarker(BlobParts.ID))));
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(Blobs.TABLE_NAME)
+            .value(Blobs.ID, bindMarker(Blobs.ID))
+            .value(Blobs.POSITION, bindMarker(Blobs.POSITION))
+            .value(Blobs.PART, bindMarker(Blobs.PART)));
+    }
+
+    private PreparedStatement prepareInsertPart(Session session) {
+        return session.prepare(insertInto(BlobParts.TABLE_NAME)
+            .value(BlobParts.ID, bindMarker(BlobParts.ID))
+            .value(BlobParts.DATA, bindMarker(BlobParts.DATA)));
+    }
+
+    public CompletableFuture<Optional<BlobId>> save(byte[] data) {
+        if (data == null) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
+        BlobId blobId = BlobId.forPayload(data);
+        return saveBlobParts(data, blobId)
+            .thenCompose(partIds -> saveBlobPartsReferences(blobId, partIds))
+            .thenApply(any -> Optional.of(blobId));
+    }
+
+    private CompletableFuture<Stream<Pair<Integer, PartId>>> saveBlobParts(byte[] data, BlobId blobId) {
+        return FluentFutureStream.of(
+            dataChunker.chunk(data, CHUNK_SIZE)
+                .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
+                    .thenApply(partId -> Pair.of(pair.getKey(), partId))))
+            .completableFuture();
+    }
+
+    private CompletableFuture<PartId> writePart(ByteBuffer data, BlobId blobId, int position) {
+        PartId partId = PartId.create(blobId, position);
+        return cassandraAsyncExecutor.executeVoid(
+            insertPart.bind()
+                .setString(BlobParts.ID, partId.getId())
+                .setBytes(BlobParts.DATA, data))
+            .thenApply(any -> partId);
+    }
+
+    private CompletableFuture<Stream<Void>> saveBlobPartsReferences(BlobId blobId, Stream<Pair<Integer, PartId>> stream) {
+        return FluentFutureStream.of(stream.map(pair ->
+            cassandraAsyncExecutor.executeVoid(insert.bind()
+                .setString(Blobs.ID, blobId.getId())
+                .setLong(Blobs.POSITION, pair.getKey())
+                .setString(Blobs.PART, pair.getValue().getId()))))
+            .completableFuture();
+    }
+
+    public CompletableFuture<byte[]> read(BlobId blobId) {
+        return cassandraAsyncExecutor.execute(
+            select.bind()
+                .setString(Blobs.ID, blobId.getId()))
+            .thenApply(this::toPartIds)
+            .thenCompose(this::toDataParts)
+            .thenApply(this::concatenateDataParts);
+    }
+
+    private ImmutableMap<Long, PartId> toPartIds(ResultSet resultSet) {
+        return CassandraUtils.convertToStream(resultSet)
+            .map(row -> Pair.of(row.getLong(Blobs.POSITION), PartId.from(row.getString(Blobs.PART))))
+            .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue));
+    }
+
+    private CompletableFuture<Stream<Optional<Row>>> toDataParts(ImmutableMap<Long, PartId> positionToIds) {
+        return CompletableFutureUtil.chainAll(
+            positionToIds.values().stream(),
+            this::readPart);
+    }
+
+    private byte[] concatenateDataParts(Stream<Optional<Row>> rows) {
+        ImmutableList<byte[]> parts = rows.flatMap(OptionalConverter::toStream)
+            .map(this::rowToData)
+            .collect(Guavate.toImmutableList());
+
+        return Bytes.concat(parts.toArray(new byte[parts.size()][]));
+    }
+
+    private byte[] rowToData(Row row) {
+        byte[] data = new byte[row.getBytes(BlobParts.DATA).remaining()];
+        row.getBytes(BlobParts.DATA).get(data);
+        return data;
+    }
+
+    private CompletableFuture<Optional<Row>> readPart(PartId partId) {
+        return cassandraAsyncExecutor.executeSingleRow(selectPart.bind()
+            .setString(BlobParts.ID, partId.getId()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java
new file mode 100644
index 0000000..57e0bfc
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.utils;
+
+import java.nio.ByteBuffer;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+
+public class DataChunker {
+
+    public Stream<Pair<Integer, ByteBuffer>> chunk(byte[] data, int chunkSize) {
+        Preconditions.checkNotNull(data);
+        Preconditions.checkArgument(chunkSize > 0, "ChunkSize can not be negative");
+
+        int size = data.length;
+        int fullChunkCount = size / chunkSize;
+
+        return Stream.concat(
+            IntStream.range(0, fullChunkCount)
+                .mapToObj(i -> Pair.of(i, ByteBuffer.wrap(data, i * chunkSize, chunkSize))),
+            lastChunk(data, chunkSize * fullChunkCount, fullChunkCount));
+    }
+
+    private Stream<Pair<Integer, ByteBuffer>> lastChunk(byte[] data, int offset, int index) {
+        if (offset == data.length && index > 0) {
+            return Stream.empty();
+        }
+        return Stream.of(Pair.of(index, ByteBuffer.wrap(data, offset, data.length - offset)));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
new file mode 100644
index 0000000..4eea870
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
@@ -0,0 +1,64 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.modules;
+
+import java.util.List;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.components.CassandraTable;
+import org.apache.james.backends.cassandra.components.CassandraType;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.google.common.collect.ImmutableList;
+
+public class CassandraBlobModule implements CassandraModule {
+
+    private final List<CassandraTable> tables;
+    private final List<CassandraType> types;
+
+    public CassandraBlobModule() {
+        tables = ImmutableList.of(
+                new CassandraTable(Blobs.TABLE_NAME,
+                        SchemaBuilder.createTable(Blobs.TABLE_NAME)
+                                .ifNotExists()
+                                .addPartitionKey(Blobs.ID, DataType.text())
+                                .addClusteringColumn(Blobs.POSITION, DataType.bigint())
+                                .addColumn(Blobs.PART, DataType.text())),
+                new CassandraTable(BlobParts.TABLE_NAME,
+                        SchemaBuilder.createTable(BlobParts.TABLE_NAME)
+                                .ifNotExists()
+                                .addPartitionKey(BlobParts.ID, DataType.text())
+                                .addColumn(BlobParts.DATA, DataType.blob())));
+        types = ImmutableList.of();
+    }
+
+    @Override
+    public List<CassandraTable> moduleTables() {
+        return tables;
+    }
+
+    @Override
+    public List<CassandraType> moduleTypes() {
+        return types;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
new file mode 100644
index 0000000..be097a5
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
@@ -0,0 +1,33 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+public interface BlobsTable {
+    String TABLE_NAME = "blobs";
+    String ID = "id";
+    String POSITION = "position";
+    String PART = "part";
+
+    interface BlobParts {
+        String TABLE_NAME = "blobParts";
+        String ID = "id";
+        String DATA = "data";
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
new file mode 100644
index 0000000..f7bc698
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+
+public interface CassandraMessageV2Table {
+
+    String TABLE_NAME = "messageV2";
+    String INTERNAL_DATE = "internalDate";
+    String BODY_START_OCTET = "bodyStartOctet";
+    String FULL_CONTENT_OCTETS = "fullContentOctets";
+    String BODY_OCTECTS = "bodyOctets";
+    String TEXTUAL_LINE_COUNT = "textualLineCount";
+    String BODY_CONTENT = "bodyContent";
+    String HEADER_CONTENT = "headerContent";
+    String PROPERTIES = "properties";
+    String ATTACHMENTS = "attachments";
+
+    String[] FIELDS = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, ATTACHMENTS };
+    String[] METADATA = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, TEXTUAL_LINE_COUNT, PROPERTIES };
+    String[] HEADERS = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES };
+    String[] BODY = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, BODY_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, ATTACHMENTS };
+
+    interface Properties {
+        String NAMESPACE = "namespace";
+        String NAME = "name";
+        String VALUE = "value";
+    }
+
+    interface Attachments {
+        String ID = "id";
+        String NAME = "name";
+        String CID = "cid";
+        String IS_INLINE = "isInline";
+    }
+
+    interface Blobs {
+        String TABLE_NAME = "blobs";
+        String ID = "id";
+        String POSITION = "position";
+        String PART = "part";
+    }
+
+    interface BlobParts {
+        String TABLE_NAME = "blobParts";
+        String ID = "id";
+        String DATA = "data";
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java
new file mode 100644
index 0000000..56d6356
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java
@@ -0,0 +1,83 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.ids;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.google.common.base.Charsets;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+public class BlobIdTest {
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void shouldRespectBeanContract() {
+        EqualsVerifier.forClass(BlobId.class).verify();
+    }
+
+    @Test
+    public void fromShouldConstructBlobId() {
+        String id = "111";
+        assertThat(BlobId.from(id))
+            .isEqualTo(new BlobId(id));
+    }
+
+    @Test
+    public void fromShouldThrowOnNull() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        BlobId.from(null);
+    }
+
+    @Test
+    public void fromShouldThrowOnEmpty() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        BlobId.from("");
+    }
+
+    @Test
+    public void forPayloadShouldThrowOnNull() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        BlobId.forPayload(null);
+    }
+
+    @Test
+    public void forPayloadShouldHashEmptyArray() {
+        BlobId blobId = BlobId.forPayload(new byte[0]);
+
+        assertThat(blobId.getId()).isEqualTo("da39a3ee5e6b4b0d3255bfef95601890afd80709");
+    }
+
+    @Test
+    public void forPayloadShouldHashArray() {
+        BlobId blobId = BlobId.forPayload("content".getBytes(Charsets.UTF_8));
+
+        assertThat(blobId.getId()).isEqualTo("040f06fd774092478d450774f5ba30c5da78acc8");
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
new file mode 100644
index 0000000..b236c55
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
@@ -0,0 +1,87 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.ids;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+public class PartIdTest {
+    private static final BlobId BLOB_ID = BlobId.from("abc");
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void shouldRespectBeanContract() {
+        EqualsVerifier.forClass(PartId.class).verify();
+    }
+
+    @Test
+    public void test () {
+        String id = "111";
+        assertThat(PartId.from(id))
+            .isEqualTo(new PartId(id));
+    }
+
+    @Test
+    public void fromShouldThrowOnNull() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        PartId.from(null);
+    }
+
+    @Test
+    public void fromShouldThrowOnEmpty() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        PartId.from("");
+    }
+
+    @Test
+    public void createShouldThrowOnNullBlobId() {
+        expectedException.expect(NullPointerException.class);
+
+        PartId.create(null, 1);
+    }
+
+    @Test
+    public void createShouldThrowOnNegativePosition() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        PartId.create(BLOB_ID, -1);
+    }
+
+    @Test
+    public void createShouldAcceptPositionZero() {
+        assertThat(PartId.create(BLOB_ID, 0).getId())
+            .isEqualTo(BLOB_ID.getId() + "-0");
+    }
+
+    @Test
+    public void createShouldConcatenateBlobIdAndPosition() {
+        assertThat(PartId.create(BLOB_ID, 36).getId())
+            .isEqualTo(BLOB_ID.getId() + "-36");
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
new file mode 100644
index 0000000..786d00e
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
@@ -0,0 +1,126 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.io.Charsets;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.mailbox.cassandra.BlobId;
+import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Strings;
+
+public class CassandraBlobsDAOTest {
+    private static final int MULTIPLE_CHUNK_SIZE = 3 * CassandraBlobsDAO.CHUNK_SIZE;
+    private CassandraCluster cassandra;
+    private CassandraBlobsDAO testee;
+
+    @Before
+    public void setUp() throws Exception {
+        cassandra = CassandraCluster.create(new CassandraBlobModule());
+        cassandra.ensureAllTables();
+
+        testee = new CassandraBlobsDAO(cassandra.getConf());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        cassandra.clearAllTables();
+        cassandra.close();
+    }
+
+    @Test
+    public void saveShouldReturnEmptyWhenNullData() throws Exception {
+        Optional<BlobId> blobId = testee.save(null).join();
+
+        assertThat(blobId.isPresent()).isFalse();
+    }
+
+    @Test
+    public void saveShouldSaveEmptyData() throws Exception {
+        Optional<BlobId> blobId = testee.save(new byte[]{}).join();
+
+        byte[] bytes = testee.read(blobId.get()).join();
+
+        assertThat(blobId.isPresent()).isTrue();
+        assertThat(new String(bytes, Charsets.UTF_8)).isEmpty();
+    }
+
+    @Test
+    public void saveShouldSaveBlankData() throws Exception {
+        Optional<BlobId> blobId = testee.save("".getBytes(Charsets.UTF_8)).join();
+
+        byte[] bytes = testee.read(blobId.get()).join();
+
+        assertThat(blobId.isPresent()).isTrue();
+        assertThat(new String(bytes, Charsets.UTF_8)).isEmpty();
+    }
+
+    @Test
+    public void saveShouldReturnBlobId() throws Exception {
+        Optional<BlobId> blobId = testee.save("toto".getBytes(Charsets.UTF_8)).join();
+
+        assertThat(blobId.isPresent()).isTrue();
+    }
+
+    @Test
+    public void readShouldBeEmptyWhenNoExisting() throws IOException {
+        byte[] bytes = testee.read(BlobId.from("unknown")).join();
+
+        assertThat(bytes).isEmpty();
+    }
+
+    @Test
+    public void readShouldReturnSavedData() throws IOException {
+        Optional<BlobId> blobId = testee.save("toto".getBytes(Charsets.UTF_8)).join();
+
+        byte[] bytes = testee.read(blobId.get()).join();
+
+        assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo("toto");
+    }
+
+    @Test
+    public void readShouldReturnLongSavedData() throws IOException {
+        String longString = Strings.repeat("0123456789\n", 1000);
+        Optional<BlobId> blobId = testee.save(longString.getBytes(Charsets.UTF_8)).join();
+
+        byte[] bytes = testee.read(blobId.get()).join();
+
+        assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo(longString);
+    }
+
+    @Test
+    public void readShouldReturnSplitSavedDataByChunk() throws IOException {
+        String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE);
+        Optional<BlobId> blobId = testee.save(longString.getBytes(Charsets.UTF_8)).join();
+
+        byte[] bytes = testee.read(blobId.get()).join();
+
+        assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo(longString);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java
new file mode 100644
index 0000000..ccb5e4f
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java
@@ -0,0 +1,139 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+public class DataChunkerTest {
+
+    public static final int CHUNK_SIZE = 10;
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    private DataChunker testee;
+
+    @Before
+    public void setUp() {
+        testee = new DataChunker();
+    }
+
+    @Test
+    public void chunkShouldThrowOnNullData() {
+        expectedException.expect(NullPointerException.class);
+
+        testee.chunk(null, CHUNK_SIZE);
+    }
+
+    @Test
+    public void chunkShouldThrowOnNegativeChunkSize() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        int chunkSize = -1;
+        testee.chunk(new byte[0], chunkSize);
+    }
+
+    @Test
+    public void chunkShouldThrowOnZeroChunkSize() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        int chunkSize = 0;
+        testee.chunk(new byte[0], chunkSize);
+    }
+
+    @Test
+    public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(new byte[0], CHUNK_SIZE);
+        assertThat(toArraysWithPosition(chunks))
+            .containsOnly(Pair.of(0, ImmutableList.of()));
+    }
+
+    @Test
+    public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+        byte[] data = "12345".getBytes(Charsets.UTF_8);
+
+        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
+
+        assertThat(toArraysWithPosition(chunks))
+            .containsOnly(Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(data))));
+    }
+
+    @Test
+    public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+        byte[] data = "1234567890".getBytes(Charsets.UTF_8);
+        assertThat(data.length).isEqualTo(CHUNK_SIZE);
+
+        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
+
+        assertThat(toArraysWithPosition(chunks))
+            .containsOnly(Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(data))));
+    }
+
+    @Test
+    public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+        byte[] part1 = "1234567890".getBytes(Charsets.UTF_8);
+        byte[] part2 = "12345".getBytes(Charsets.UTF_8);
+        byte[] data = Bytes.concat(part1, part2);
+
+        Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
+
+        assertThat(toArraysWithPosition(chunks))
+            .containsOnly(
+                Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(part1))),
+                Pair.of(1, ImmutableList.copyOf(ArrayUtils.toObject(part2))));
+    }
+
+    private ImmutableList<Pair<Integer, List<Byte>>> toArraysWithPosition(Stream<Pair<Integer, ByteBuffer>> chunks) {
+        return chunks
+            .map(this::toByteArrayPair)
+            .collect(Guavate.toImmutableList());
+    }
+
+    private Pair<Integer, List<Byte>> toByteArrayPair(Pair<Integer, ByteBuffer> pair) {
+        try {
+            Byte[] bytes = ArrayUtils.toObject(IOUtils.toByteArray(new ByteBufferBackedInputStream(pair.getRight())));
+            return Pair.of(pair.getKey(), Arrays.asList(bytes));
+        } catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org