You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/02/23 09:19:52 UTC

[1/2] james-project git commit: JAMES-1945 Introduce JamesCollectors::chunker

Repository: james-project
Updated Branches:
  refs/heads/master f3140967a -> e25335842


JAMES-1945 Introduce JamesCollectors::chunker


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d43ce78c
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d43ce78c
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d43ce78c

Branch: refs/heads/master
Commit: d43ce78c6b20f4e38e69376fb77ad6003ae4bb8a
Parents: f314096
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 22 10:59:51 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Feb 23 16:18:21 2017 +0700

----------------------------------------------------------------------
 .../james/util/streams/JamesCollectors.java     |  36 +++++
 .../james/util/streams/JamesCollectorsTest.java | 130 +++++++++++++++++++
 2 files changed, 166 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d43ce78c/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
----------------------------------------------------------------------
diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
new file mode 100644
index 0000000..10c34bd
--- /dev/null
+++ b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
@@ -0,0 +1,36 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.streams;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import com.google.common.base.Preconditions;
+
+public class JamesCollectors {
+    public static <D> Collector<D, ?, Map<Integer, List<D>>> chunker(int chunkSize) {
+        Preconditions.checkArgument(chunkSize > 0, "ChunkSize should be strictly positive");
+        AtomicInteger counter = new AtomicInteger(-1);
+        return Collectors.groupingBy(x -> counter.incrementAndGet() / chunkSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/d43ce78c/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
----------------------------------------------------------------------
diff --git a/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java b/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
new file mode 100644
index 0000000..4981735
--- /dev/null
+++ b/server/container/util-java8/src/test/java/org/apache/james/util/streams/JamesCollectorsTest.java
@@ -0,0 +1,130 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.streams;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableList;
+
+public class JamesCollectorsTest {
+
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void chunkerShouldAcceptEmptyStrem() {
+        Stream<Integer> emptyStream = Stream.of();
+
+        assertThat(emptyStream.collect(JamesCollectors.chunker(10)))
+            .isEmpty();
+    }
+
+    @Test
+    public void chunkerShouldThrowOnZeroChunkSize() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        JamesCollectors.chunker(0);
+    }
+
+    @Test
+    public void chunkerShouldThrowOnNegativeChunkSize() {
+        expectedException.expect(IllegalArgumentException.class);
+
+        JamesCollectors.chunker(-1);
+    }
+
+    @Test
+    public void chunkerShouldChunkMonoValueStreams() {
+        Stream<Integer> monoValueStream = Stream.of(1);
+
+        List<List<Integer>> values = monoValueStream.collect(JamesCollectors.chunker(10))
+            .values()
+            .stream()
+            .map(ImmutableList::copyOf)
+            .collect(Guavate.toImmutableList());
+        assertThat(values)
+            .isEqualTo(ImmutableList.of(ImmutableList.of(1)));
+    }
+
+    @Test
+    public void chunkerShouldChunkStreamsSmallerThanChunkSize() {
+        Stream<Integer> stream = Stream.of(1, 2);
+
+        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
+            .values()
+            .stream()
+            .map(ImmutableList::copyOf)
+            .collect(Guavate.toImmutableList());
+        assertThat(values)
+            .isEqualTo(ImmutableList.of(ImmutableList.of(1, 2)));
+    }
+
+    @Test
+    public void chunkerShouldChunkStreamsAsBigAsChunkSize() {
+        Stream<Integer> stream = Stream.of(1, 2, 3);
+
+        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
+            .values()
+            .stream()
+            .map(ImmutableList::copyOf)
+            .collect(Guavate.toImmutableList());
+        assertThat(values)
+            .isEqualTo(ImmutableList.of(ImmutableList.of(1, 2, 3)));
+    }
+
+    @Test
+    public void chunkerShouldChunkStreamsBiggerThanChunkSize() {
+        Stream<Integer> stream = Stream.of(1, 2, 3, 4);
+
+        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
+            .values()
+            .stream()
+            .map(ImmutableList::copyOf)
+            .collect(Guavate.toImmutableList());
+        assertThat(values)
+            .isEqualTo(ImmutableList.of(
+                ImmutableList.of(1, 2, 3),
+                ImmutableList.of(4)));
+    }
+
+    @Test
+    public void chunkerShouldChunkInSeveralBuckets() {
+        Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5, 6, 7);
+
+        List<List<Integer>> values = stream.collect(JamesCollectors.chunker(3))
+            .values()
+            .stream()
+            .map(ImmutableList::copyOf)
+            .collect(Guavate.toImmutableList());
+        assertThat(values)
+            .isEqualTo(ImmutableList.of(
+                ImmutableList.of(1, 2, 3),
+                ImmutableList.of(4, 5, 6),
+                ImmutableList.of(7)));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


[2/2] james-project git commit: JAMES-1945 Cassandra : Use chunker while fetching messages

Posted by bt...@apache.org.
JAMES-1945 Cassandra : Use chunker while fetching messages

This removes the limitation of 65336 messages retieved maximum


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e2533584
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e2533584
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e2533584

Branch: refs/heads/master
Commit: e25335842d565e4db2b72ce288b65ea6c7180c65
Parents: d43ce78
Author: benwa <bt...@linagora.com>
Authored: Wed Feb 22 11:00:32 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Feb 23 16:18:25 2017 +0700

----------------------------------------------------------------------
 .../mailbox/cassandra/mail/CassandraMessageDAO.java   | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/e2533584/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 1ac38e5..48828ef 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -47,6 +47,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -76,6 +77,8 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleProperty;
+import org.apache.james.util.CompletableFutureUtil;
+import org.apache.james.util.streams.JamesCollectors;
 
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
@@ -98,6 +101,7 @@ import com.google.common.primitives.Bytes;
 
 public class CassandraMessageDAO {
 
+    public static final int CHUNK_SIZE_ON_READ = 5000;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final CassandraTypesProvider typesProvider;
     private final Factory messageIdFactory;
@@ -182,8 +186,14 @@ public class CassandraMessageDAO {
     }
 
     public CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Optional<Integer> limit) {
-        return retrieveRows(messageIds, fetchType, limit)
-                .thenApply(resultSet -> toMessagesWithAttachmentRepresentation(messageIds, fetchType, resultSet));
+        return CompletableFutureUtil.allOf(
+            messageIds.stream()
+                .collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ))
+                .values()
+                .stream()
+                .map(ids -> retrieveRows(ids, fetchType, limit)
+                    .thenApply(resultSet -> toMessagesWithAttachmentRepresentation(messageIds, fetchType, resultSet))))
+            .thenApply(stream -> stream.flatMap(Function.identity()));
     }
 
     private Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> toMessagesWithAttachmentRepresentation(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, ResultSet resultSet) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org