You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ad...@apache.org on 2017/07/10 17:54:21 UTC
[09/41] james-project git commit: JAMES-2082 Implement blob storage
JAMES-2082 Implement blob storage
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/ee45bd4e
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/ee45bd4e
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/ee45bd4e
Branch: refs/heads/master
Commit: ee45bd4e3631125cc03c8527e8b2b63b8a6b343e
Parents: 67aa191
Author: quynhn <qn...@linagora.com>
Authored: Mon Jul 3 18:07:55 2017 +0700
Committer: Antoine Duprat <ad...@linagora.com>
Committed: Mon Jul 10 14:23:55 2017 +0200
----------------------------------------------------------------------
.../apache/james/mailbox/cassandra/BlobId.java | 73 ++++++++
.../apache/james/mailbox/cassandra/PartId.java | 72 ++++++++
.../cassandra/mail/CassandraBlobsDAO.java | 177 +++++++++++++++++++
.../cassandra/mail/utils/DataChunker.java | 52 ++++++
.../cassandra/modules/CassandraBlobModule.java | 64 +++++++
.../mailbox/cassandra/table/BlobTable.java | 33 ++++
.../table/CassandraMessageV2Table.java | 67 +++++++
.../james/mailbox/cassandra/ids/BlobIdTest.java | 83 +++++++++
.../james/mailbox/cassandra/ids/PartIdTest.java | 87 +++++++++
.../cassandra/mail/CassandraBlobsDAOTest.java | 126 +++++++++++++
.../cassandra/mail/utils/DataChunkerTest.java | 139 +++++++++++++++
11 files changed, 973 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java
new file mode 100644
index 0000000..50601b6
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/BlobId.java
@@ -0,0 +1,73 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+import org.apache.commons.codec.digest.DigestUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+public class BlobId {
+ public static BlobId forPayload(byte[] payload) {
+ Preconditions.checkArgument(payload != null);
+ return new BlobId(DigestUtils.sha1Hex(payload));
+ }
+
+ public static BlobId from(String id) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(id));
+ return new BlobId(id);
+ }
+
+ private final String id;
+
+ @VisibleForTesting
+ BlobId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ if (obj instanceof BlobId) {
+ BlobId other = (BlobId) obj;
+ return Objects.equal(id, other.id);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects
+ .toStringHelper(this)
+ .add("id", id)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java
new file mode 100644
index 0000000..421be7f
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/PartId.java
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+public class PartId {
+ public static PartId create(BlobId blobId, int position) {
+ Preconditions.checkNotNull(blobId);
+ Preconditions.checkArgument(position >= 0, "Position needs to be positive");
+ return new PartId(blobId.getId() + "-" + position);
+ }
+
+ public static PartId from(String id) {
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(id));
+ return new PartId(id);
+ }
+
+ private final String id;
+
+ @VisibleForTesting
+ PartId(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public final boolean equals(Object obj) {
+ if (obj instanceof PartId) {
+ PartId other = (PartId) obj;
+ return Objects.equal(id, other.id);
+ }
+ return false;
+ }
+
+ @Override
+ public final int hashCode() {
+ return Objects.hashCode(id);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects
+ .toStringHelper(this)
+ .add("id", id)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
new file mode 100644
index 0000000..0d20e88
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAO.java
@@ -0,0 +1,177 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.mailbox.cassandra.BlobId;
+import org.apache.james.mailbox.cassandra.PartId;
+import org.apache.james.mailbox.cassandra.mail.utils.DataChunker;
+import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.FluentFutureStream;
+import org.apache.james.util.OptionalConverter;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.primitives.Bytes;
+
+
+public class CassandraBlobsDAO {
+
+ public static final int CHUNK_SIZE = 1024 * 100;
+ private final CassandraAsyncExecutor cassandraAsyncExecutor;
+ private final PreparedStatement insert;
+ private final PreparedStatement insertPart;
+ private final PreparedStatement select;
+ private final PreparedStatement selectPart;
+ private final DataChunker dataChunker;
+
+ @Inject
+ public CassandraBlobsDAO(Session session) {
+ this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
+ this.dataChunker = new DataChunker();
+ this.insert = prepareInsert(session);
+ this.select = prepareSelect(session);
+
+ this.insertPart = prepareInsertPart(session);
+ this.selectPart = prepareSelectPart(session);
+ }
+
+ private PreparedStatement prepareSelect(Session session) {
+ return session.prepare(select()
+ .from(Blobs.TABLE_NAME)
+ .where(eq(Blobs.ID, bindMarker(Blobs.ID))));
+ }
+
+ private PreparedStatement prepareSelectPart(Session session) {
+ return session.prepare(select()
+ .from(BlobParts.TABLE_NAME)
+ .where(eq(BlobParts.ID, bindMarker(BlobParts.ID))));
+ }
+
+ private PreparedStatement prepareInsert(Session session) {
+ return session.prepare(insertInto(Blobs.TABLE_NAME)
+ .value(Blobs.ID, bindMarker(Blobs.ID))
+ .value(Blobs.POSITION, bindMarker(Blobs.POSITION))
+ .value(Blobs.PART, bindMarker(Blobs.PART)));
+ }
+
+ private PreparedStatement prepareInsertPart(Session session) {
+ return session.prepare(insertInto(BlobParts.TABLE_NAME)
+ .value(BlobParts.ID, bindMarker(BlobParts.ID))
+ .value(BlobParts.DATA, bindMarker(BlobParts.DATA)));
+ }
+
+ public CompletableFuture<Optional<BlobId>> save(byte[] data) {
+ if (data == null) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ BlobId blobId = BlobId.forPayload(data);
+ return saveBlobParts(data, blobId)
+ .thenCompose(partIds -> saveBlobPartsReferences(blobId, partIds))
+ .thenApply(any -> Optional.of(blobId));
+ }
+
+ private CompletableFuture<Stream<Pair<Integer, PartId>>> saveBlobParts(byte[] data, BlobId blobId) {
+ return FluentFutureStream.of(
+ dataChunker.chunk(data, CHUNK_SIZE)
+ .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
+ .thenApply(partId -> Pair.of(pair.getKey(), partId))))
+ .completableFuture();
+ }
+
+ private CompletableFuture<PartId> writePart(ByteBuffer data, BlobId blobId, int position) {
+ PartId partId = PartId.create(blobId, position);
+ return cassandraAsyncExecutor.executeVoid(
+ insertPart.bind()
+ .setString(BlobParts.ID, partId.getId())
+ .setBytes(BlobParts.DATA, data))
+ .thenApply(any -> partId);
+ }
+
+ private CompletableFuture<Stream<Void>> saveBlobPartsReferences(BlobId blobId, Stream<Pair<Integer, PartId>> stream) {
+ return FluentFutureStream.of(stream.map(pair ->
+ cassandraAsyncExecutor.executeVoid(insert.bind()
+ .setString(Blobs.ID, blobId.getId())
+ .setLong(Blobs.POSITION, pair.getKey())
+ .setString(Blobs.PART, pair.getValue().getId()))))
+ .completableFuture();
+ }
+
+ public CompletableFuture<byte[]> read(BlobId blobId) {
+ return cassandraAsyncExecutor.execute(
+ select.bind()
+ .setString(Blobs.ID, blobId.getId()))
+ .thenApply(this::toPartIds)
+ .thenCompose(this::toDataParts)
+ .thenApply(this::concatenateDataParts);
+ }
+
+ private ImmutableMap<Long, PartId> toPartIds(ResultSet resultSet) {
+ return CassandraUtils.convertToStream(resultSet)
+ .map(row -> Pair.of(row.getLong(Blobs.POSITION), PartId.from(row.getString(Blobs.PART))))
+ .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue));
+ }
+
+ private CompletableFuture<Stream<Optional<Row>>> toDataParts(ImmutableMap<Long, PartId> positionToIds) {
+ return CompletableFutureUtil.chainAll(
+ positionToIds.values().stream(),
+ this::readPart);
+ }
+
+ private byte[] concatenateDataParts(Stream<Optional<Row>> rows) {
+ ImmutableList<byte[]> parts = rows.flatMap(OptionalConverter::toStream)
+ .map(this::rowToData)
+ .collect(Guavate.toImmutableList());
+
+ return Bytes.concat(parts.toArray(new byte[parts.size()][]));
+ }
+
+ private byte[] rowToData(Row row) {
+ byte[] data = new byte[row.getBytes(BlobParts.DATA).remaining()];
+ row.getBytes(BlobParts.DATA).get(data);
+ return data;
+ }
+
+ private CompletableFuture<Optional<Row>> readPart(PartId partId) {
+ return cassandraAsyncExecutor.executeSingleRow(selectPart.bind()
+ .setString(BlobParts.ID, partId.getId()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java
new file mode 100644
index 0000000..57e0bfc
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunker.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.utils;
+
+import java.nio.ByteBuffer;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.google.common.base.Preconditions;
+
+public class DataChunker {
+
+ public Stream<Pair<Integer, ByteBuffer>> chunk(byte[] data, int chunkSize) {
+ Preconditions.checkNotNull(data);
+ Preconditions.checkArgument(chunkSize > 0, "ChunkSize can not be negative");
+
+ int size = data.length;
+ int fullChunkCount = size / chunkSize;
+
+ return Stream.concat(
+ IntStream.range(0, fullChunkCount)
+ .mapToObj(i -> Pair.of(i, ByteBuffer.wrap(data, i * chunkSize, chunkSize))),
+ lastChunk(data, chunkSize * fullChunkCount, fullChunkCount));
+ }
+
+ private Stream<Pair<Integer, ByteBuffer>> lastChunk(byte[] data, int offset, int index) {
+ if (offset == data.length && index > 0) {
+ return Stream.empty();
+ }
+ return Stream.of(Pair.of(index, ByteBuffer.wrap(data, offset, data.length - offset)));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
new file mode 100644
index 0000000..4eea870
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/modules/CassandraBlobModule.java
@@ -0,0 +1,64 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.modules;
+
+import java.util.List;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.components.CassandraTable;
+import org.apache.james.backends.cassandra.components.CassandraType;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+import com.google.common.collect.ImmutableList;
+
+public class CassandraBlobModule implements CassandraModule {
+
+ private final List<CassandraTable> tables;
+ private final List<CassandraType> types;
+
+ public CassandraBlobModule() {
+ tables = ImmutableList.of(
+ new CassandraTable(Blobs.TABLE_NAME,
+ SchemaBuilder.createTable(Blobs.TABLE_NAME)
+ .ifNotExists()
+ .addPartitionKey(Blobs.ID, DataType.text())
+ .addClusteringColumn(Blobs.POSITION, DataType.bigint())
+ .addColumn(Blobs.PART, DataType.text())),
+ new CassandraTable(BlobParts.TABLE_NAME,
+ SchemaBuilder.createTable(BlobParts.TABLE_NAME)
+ .ifNotExists()
+ .addPartitionKey(BlobParts.ID, DataType.text())
+ .addColumn(BlobParts.DATA, DataType.blob())));
+ types = ImmutableList.of();
+ }
+
+ @Override
+ public List<CassandraTable> moduleTables() {
+ return tables;
+ }
+
+ @Override
+ public List<CassandraType> moduleTypes() {
+ return types;
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
new file mode 100644
index 0000000..be097a5
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/BlobTable.java
@@ -0,0 +1,33 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+public interface BlobsTable {
+ String TABLE_NAME = "blobs";
+ String ID = "id";
+ String POSITION = "position";
+ String PART = "part";
+
+ interface BlobParts {
+ String TABLE_NAME = "blobParts";
+ String ID = "id";
+ String DATA = "data";
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
new file mode 100644
index 0000000..f7bc698
--- /dev/null
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/table/CassandraMessageV2Table.java
@@ -0,0 +1,67 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.table;
+
+import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
+
+public interface CassandraMessageV2Table {
+
+ String TABLE_NAME = "messageV2";
+ String INTERNAL_DATE = "internalDate";
+ String BODY_START_OCTET = "bodyStartOctet";
+ String FULL_CONTENT_OCTETS = "fullContentOctets";
+ String BODY_OCTECTS = "bodyOctets";
+ String TEXTUAL_LINE_COUNT = "textualLineCount";
+ String BODY_CONTENT = "bodyContent";
+ String HEADER_CONTENT = "headerContent";
+ String PROPERTIES = "properties";
+ String ATTACHMENTS = "attachments";
+
+ String[] FIELDS = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, BODY_CONTENT, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, ATTACHMENTS };
+ String[] METADATA = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, TEXTUAL_LINE_COUNT, PROPERTIES };
+ String[] HEADERS = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, HEADER_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES };
+ String[] BODY = { MESSAGE_ID, INTERNAL_DATE, BODY_START_OCTET, FULL_CONTENT_OCTETS, BODY_OCTECTS, BODY_CONTENT, TEXTUAL_LINE_COUNT, PROPERTIES, ATTACHMENTS };
+
+ interface Properties {
+ String NAMESPACE = "namespace";
+ String NAME = "name";
+ String VALUE = "value";
+ }
+
+ interface Attachments {
+ String ID = "id";
+ String NAME = "name";
+ String CID = "cid";
+ String IS_INLINE = "isInline";
+ }
+
+ interface Blobs {
+ String TABLE_NAME = "blobs";
+ String ID = "id";
+ String POSITION = "position";
+ String PART = "part";
+ }
+
+ interface BlobParts {
+ String TABLE_NAME = "blobParts";
+ String ID = "id";
+ String DATA = "data";
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java
new file mode 100644
index 0000000..56d6356
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/BlobIdTest.java
@@ -0,0 +1,83 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.ids;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.google.common.base.Charsets;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+public class BlobIdTest {
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void shouldRespectBeanContract() {
+ EqualsVerifier.forClass(BlobId.class).verify();
+ }
+
+ @Test
+ public void fromShouldConstructBlobId() {
+ String id = "111";
+ assertThat(BlobId.from(id))
+ .isEqualTo(new BlobId(id));
+ }
+
+ @Test
+ public void fromShouldThrowOnNull() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ BlobId.from(null);
+ }
+
+ @Test
+ public void fromShouldThrowOnEmpty() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ BlobId.from("");
+ }
+
+ @Test
+ public void forPayloadShouldThrowOnNull() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ BlobId.forPayload(null);
+ }
+
+ @Test
+ public void forPayloadShouldHashEmptyArray() {
+ BlobId blobId = BlobId.forPayload(new byte[0]);
+
+ assertThat(blobId.getId()).isEqualTo("da39a3ee5e6b4b0d3255bfef95601890afd80709");
+ }
+
+ @Test
+ public void forPayloadShouldHashArray() {
+ BlobId blobId = BlobId.forPayload("content".getBytes(Charsets.UTF_8));
+
+ assertThat(blobId.getId()).isEqualTo("040f06fd774092478d450774f5ba30c5da78acc8");
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
new file mode 100644
index 0000000..b236c55
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/ids/PartIdTest.java
@@ -0,0 +1,87 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.ids;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+
+public class PartIdTest {
+ private static final BlobId BLOB_ID = BlobId.from("abc");
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void shouldRespectBeanContract() {
+ EqualsVerifier.forClass(PartId.class).verify();
+ }
+
+ @Test
+ public void test () {
+ String id = "111";
+ assertThat(PartId.from(id))
+ .isEqualTo(new PartId(id));
+ }
+
+ @Test
+ public void fromShouldThrowOnNull() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ PartId.from(null);
+ }
+
+ @Test
+ public void fromShouldThrowOnEmpty() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ PartId.from("");
+ }
+
+ @Test
+ public void createShouldThrowOnNullBlobId() {
+ expectedException.expect(NullPointerException.class);
+
+ PartId.create(null, 1);
+ }
+
+ @Test
+ public void createShouldThrowOnNegativePosition() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ PartId.create(BLOB_ID, -1);
+ }
+
+ @Test
+ public void createShouldAcceptPositionZero() {
+ assertThat(PartId.create(BLOB_ID, 0).getId())
+ .isEqualTo(BLOB_ID.getId() + "-0");
+ }
+
+ @Test
+ public void createShouldConcatenateBlobIdAndPosition() {
+ assertThat(PartId.create(BLOB_ID, 36).getId())
+ .isEqualTo(BLOB_ID.getId() + "-36");
+ }
+}
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
new file mode 100644
index 0000000..786d00e
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraBlobsDAOTest.java
@@ -0,0 +1,126 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.commons.io.Charsets;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.mailbox.cassandra.BlobId;
+import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Strings;
+
+public class CassandraBlobsDAOTest {
+ private static final int MULTIPLE_CHUNK_SIZE = 3 * CassandraBlobsDAO.CHUNK_SIZE;
+ private CassandraCluster cassandra;
+ private CassandraBlobsDAO testee;
+
+ @Before
+ public void setUp() throws Exception {
+ cassandra = CassandraCluster.create(new CassandraBlobModule());
+ cassandra.ensureAllTables();
+
+ testee = new CassandraBlobsDAO(cassandra.getConf());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cassandra.clearAllTables();
+ cassandra.close();
+ }
+
+ @Test
+ public void saveShouldReturnEmptyWhenNullData() throws Exception {
+ Optional<BlobId> blobId = testee.save(null).join();
+
+ assertThat(blobId.isPresent()).isFalse();
+ }
+
+ @Test
+ public void saveShouldSaveEmptyData() throws Exception {
+ Optional<BlobId> blobId = testee.save(new byte[]{}).join();
+
+ byte[] bytes = testee.read(blobId.get()).join();
+
+ assertThat(blobId.isPresent()).isTrue();
+ assertThat(new String(bytes, Charsets.UTF_8)).isEmpty();
+ }
+
+ @Test
+ public void saveShouldSaveBlankData() throws Exception {
+ Optional<BlobId> blobId = testee.save("".getBytes(Charsets.UTF_8)).join();
+
+ byte[] bytes = testee.read(blobId.get()).join();
+
+ assertThat(blobId.isPresent()).isTrue();
+ assertThat(new String(bytes, Charsets.UTF_8)).isEmpty();
+ }
+
+ @Test
+ public void saveShouldReturnBlobId() throws Exception {
+ Optional<BlobId> blobId = testee.save("toto".getBytes(Charsets.UTF_8)).join();
+
+ assertThat(blobId.isPresent()).isTrue();
+ }
+
+ @Test
+ public void readShouldBeEmptyWhenNoExisting() throws IOException {
+ byte[] bytes = testee.read(BlobId.from("unknown")).join();
+
+ assertThat(bytes).isEmpty();
+ }
+
+ @Test
+ public void readShouldReturnSavedData() throws IOException {
+ Optional<BlobId> blobId = testee.save("toto".getBytes(Charsets.UTF_8)).join();
+
+ byte[] bytes = testee.read(blobId.get()).join();
+
+ assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo("toto");
+ }
+
+ @Test
+ public void readShouldReturnLongSavedData() throws IOException {
+ String longString = Strings.repeat("0123456789\n", 1000);
+ Optional<BlobId> blobId = testee.save(longString.getBytes(Charsets.UTF_8)).join();
+
+ byte[] bytes = testee.read(blobId.get()).join();
+
+ assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo(longString);
+ }
+
+ @Test
+ public void readShouldReturnSplitSavedDataByChunk() throws IOException {
+ String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE);
+ Optional<BlobId> blobId = testee.save(longString.getBytes(Charsets.UTF_8)).join();
+
+ byte[] bytes = testee.read(blobId.get()).join();
+
+ assertThat(new String(bytes, Charsets.UTF_8)).isEqualTo(longString);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/james-project/blob/ee45bd4e/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java
new file mode 100644
index 0000000..ccb5e4f
--- /dev/null
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/DataChunkerTest.java
@@ -0,0 +1,139 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.utils;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Bytes;
+
+public class DataChunkerTest {
+
+ public static final int CHUNK_SIZE = 10;
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ private DataChunker testee;
+
+ @Before
+ public void setUp() {
+ testee = new DataChunker();
+ }
+
+ @Test
+ public void chunkShouldThrowOnNullData() {
+ expectedException.expect(NullPointerException.class);
+
+ testee.chunk(null, CHUNK_SIZE);
+ }
+
+ @Test
+ public void chunkShouldThrowOnNegativeChunkSize() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ int chunkSize = -1;
+ testee.chunk(new byte[0], chunkSize);
+ }
+
+ @Test
+ public void chunkShouldThrowOnZeroChunkSize() {
+ expectedException.expect(IllegalArgumentException.class);
+
+ int chunkSize = 0;
+ testee.chunk(new byte[0], chunkSize);
+ }
+
+ @Test
+ public void chunkShouldReturnOneEmptyArrayWhenInputEmpty() {
+ Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(new byte[0], CHUNK_SIZE);
+ assertThat(toArraysWithPosition(chunks))
+ .containsOnly(Pair.of(0, ImmutableList.of()));
+ }
+
+ @Test
+ public void chunkShouldReturnOneArrayWhenInputLessThanChunkSize() {
+ byte[] data = "12345".getBytes(Charsets.UTF_8);
+
+ Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
+
+ assertThat(toArraysWithPosition(chunks))
+ .containsOnly(Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(data))));
+ }
+
+ @Test
+ public void chunkShouldReturnOneArrayWhenInputEqualsChunkSize() {
+ byte[] data = "1234567890".getBytes(Charsets.UTF_8);
+ assertThat(data.length).isEqualTo(CHUNK_SIZE);
+
+ Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
+
+ assertThat(toArraysWithPosition(chunks))
+ .containsOnly(Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(data))));
+ }
+
+ @Test
+ public void chunkShouldReturnSeveralArrayWhenInputBiggerThanChunkSize() {
+ byte[] part1 = "1234567890".getBytes(Charsets.UTF_8);
+ byte[] part2 = "12345".getBytes(Charsets.UTF_8);
+ byte[] data = Bytes.concat(part1, part2);
+
+ Stream<Pair<Integer, ByteBuffer>> chunks = testee.chunk(data, CHUNK_SIZE);
+
+ assertThat(toArraysWithPosition(chunks))
+ .containsOnly(
+ Pair.of(0, ImmutableList.copyOf(ArrayUtils.toObject(part1))),
+ Pair.of(1, ImmutableList.copyOf(ArrayUtils.toObject(part2))));
+ }
+
+ private ImmutableList<Pair<Integer, List<Byte>>> toArraysWithPosition(Stream<Pair<Integer, ByteBuffer>> chunks) {
+ return chunks
+ .map(this::toByteArrayPair)
+ .collect(Guavate.toImmutableList());
+ }
+
+ private Pair<Integer, List<Byte>> toByteArrayPair(Pair<Integer, ByteBuffer> pair) {
+ try {
+ Byte[] bytes = ArrayUtils.toObject(IOUtils.toByteArray(new ByteBufferBackedInputStream(pair.getRight())));
+ return Pair.of(pair.getKey(), Arrays.asList(bytes));
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org