You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/01 07:22:26 UTC

[james-project] 04/09: JAMES-2813 Add CassandraTaskExecutionDetailsProjection DAO and Module

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 845aafe989b851198fd2bf3326b074455d08ccad
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Mon Jul 29 15:44:21 2019 +0200

    JAMES-2813 Add CassandraTaskExecutionDetailsProjection DAO and Module
---
 server/task-cassandra/pom.xml                      |  15 +++
 ...assandraTaskExecutionDetailsProjectionDAO.scala |  85 +++++++++++++++
 ...andraTaskExecutionDetailsProjectionModule.scala |  57 ++++++++++
 ...andraTaskExecutionDetailsProjectionDAOTest.java | 118 +++++++++++++++++++++
 4 files changed, 275 insertions(+)

diff --git a/server/task-cassandra/pom.xml b/server/task-cassandra/pom.xml
index 12852bc..1200ea1 100644
--- a/server/task-cassandra/pom.xml
+++ b/server/task-cassandra/pom.xml
@@ -34,6 +34,16 @@
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-cassandra</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-james-backends-cassandra</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>apache-james-mailbox-api</artifactId>
             <scope>test</scope>
         </dependency>
@@ -113,6 +123,11 @@
             <groupId>org.scala-lang.modules</groupId>
             <artifactId>scala-java8-compat_${scala.base}</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
new file mode 100644
index 0000000..1a46fa4
--- /dev/null
+++ b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -0,0 +1,85 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+package org.apache.james.task.eventsourcing.cassandra
+
+import java.util.Optional
+
+import javax.inject.Inject
+
+import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule}
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._
+import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManager}
+
+import com.datastax.driver.core.querybuilder.QueryBuilder
+import com.datastax.driver.core.querybuilder.QueryBuilder.{bindMarker, insertInto, select}
+import com.datastax.driver.core.{Row, Session}
+import reactor.core.publisher.{Flux, Mono}
+
+@Inject
+class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider: CassandraTypesProvider) {
+
+  private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
+  private val dateType = typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)
+
+  private val insertStatement = session.prepare(insertInto(TABLE_NAME)
+    .value(TASK_ID, bindMarker(TASK_ID))
+    .value(TYPE, bindMarker(TYPE))
+    .value(STATUS, bindMarker(STATUS))
+    .value(SUBMITTED_DATE, bindMarker(SUBMITTED_DATE))
+    .value(STARTED_DATE, bindMarker(STARTED_DATE))
+    .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE))
+    .value(CANCELED_DATE, bindMarker(CANCELED_DATE))
+    .value(FAILED_DATE, bindMarker(FAILED_DATE)))
+
+  private val selectStatement = session.prepare(select().from(TABLE_NAME)
+    .where(QueryBuilder.eq(TASK_ID, bindMarker(TASK_ID))))
+
+  private val listStatement = session.prepare(select().from(TABLE_NAME))
+
+  def saveDetails(details : TaskExecutionDetails): Mono[Void] = cassandraAsyncExecutor.executeVoid(
+    insertStatement.bind
+      .setUUID(TASK_ID, details.getTaskId.getValue)
+      .setString(TYPE, details.getType)
+      .setString(STATUS, details.getStatus.getValue)
+      .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmitDate).orElse(null))
+      .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))
+      .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null))
+      .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null))
+      .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null)))
+
+  def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = cassandraAsyncExecutor
+    .executeSingleRow(selectStatement.bind().setUUID(TASK_ID, taskId.getValue))
+    .map(readRow)
+
+  def listDetails(): Flux[TaskExecutionDetails] = cassandraAsyncExecutor
+    .executeRows(listStatement.bind())
+    .map(readRow)
+
+  private def readRow(row: Row): TaskExecutionDetails = new TaskExecutionDetails(
+    taskId = TaskId.fromUUID(row.getUUID(TASK_ID)),
+    `type` = row.getString(TYPE),
+    status = TaskManager.Status.fromString(row.getString(STATUS)),
+    submitDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)),
+    startedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(STARTED_DATE)),
+    completedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(COMPLETED_DATE)),
+    canceledDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(CANCELED_DATE)),
+    failedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(FAILED_DATE)),
+    additionalInformation = Optional.empty)
+}
diff --git a/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
new file mode 100644
index 0000000..698a5d7
--- /dev/null
+++ b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
@@ -0,0 +1,57 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+package org.apache.james.task.eventsourcing.cassandra
+
+import com.datastax.driver.core.DataType.{text, uuid}
+import com.datastax.driver.core.schemabuilder.{Create, SchemaBuilder}
+import org.apache.james.backends.cassandra.components.CassandraModule
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule
+
+object CassandraTaskExecutionDetailsProjectionTable {
+  val TABLE_NAME: String = "taskExecutionDetailsProjection"
+
+  val TASK_ID: String = "taskID"
+  val TYPE: String = "type"
+  val STATUS: String = "status"
+  val SUBMITTED_DATE: String = "submittedDate"
+  val STARTED_DATE: String = "startedDate"
+  val COMPLETED_DATE: String = "completedDate"
+  val CANCELED_DATE: String = "canceledDate"
+  val FAILED_DATE: String = "failedDate"
+}
+
+object CassandraTaskExecutionDetailsProjectionModule {
+
+  val MODULE: CassandraModule = CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME)
+    .comment("Projection of TaskExecutionDetails used by the distributed task manager")
+    .options((options: Create.Options) => options
+      .caching(
+        SchemaBuilder.KeyCaching.ALL,
+        SchemaBuilder.noRows()))
+    .statement((statement: Create) => statement
+      .addPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, uuid)
+      .addColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, text)
+      .addColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, text)
+      .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
+    .build
+}
diff --git a/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java
new file mode 100644
index 0000000..fc6066b
--- /dev/null
+++ b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java
@@ -0,0 +1,118 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+
+package org.apache.james.task.eventsourcing.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.ZonedDateTime;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class CassandraTaskExecutionDetailsProjectionDAOTest {
+
+    private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+    private static final TaskId TASK_ID_2 = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafe");
+
+    private static final Optional<TaskExecutionDetails.AdditionalInformation> ADDITIONAL_INFORMATION = Optional.empty();
+    private static final Optional<ZonedDateTime> SUBMIT_DATE = Optional.empty();
+    private static final Optional<ZonedDateTime> STARTED_DATE = Optional.empty();
+    private static final Optional<ZonedDateTime> COMPLETED_DATE = Optional.empty();
+    private static final Optional<ZonedDateTime> CANCELLED_DATE = Optional.empty();
+    private static final Optional<ZonedDateTime> FAILED_DATE = Optional.empty();
+
+    private static final TaskExecutionDetails TASK_EXECUTION_DETAILS =  new TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION,
+        TaskManager.Status.COMPLETED, SUBMIT_DATE, STARTED_DATE, COMPLETED_DATE, CANCELLED_DATE, FAILED_DATE);
+    private static final TaskExecutionDetails TASK_EXECUTION_DETAILS_2 =  new TaskExecutionDetails(TASK_ID_2, "type", ADDITIONAL_INFORMATION,
+        TaskManager.Status.COMPLETED, STARTED_DATE, STARTED_DATE, COMPLETED_DATE, CANCELLED_DATE, FAILED_DATE);
+    private static final TaskExecutionDetails TASK_EXECUTION_DETAILS_UPDATED =  new TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION,
+        TaskManager.Status.FAILED, STARTED_DATE, STARTED_DATE, COMPLETED_DATE, CANCELLED_DATE, FAILED_DATE);
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+            CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE, CassandraZonedDateTimeModule.MODULE, CassandraTaskExecutionDetailsProjectionModule.MODULE()));
+
+    private CassandraTaskExecutionDetailsProjectionDAO testee;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        testee = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
+    }
+
+    @Test
+    void readDetailsShouldBeAbleToRetrieveASavedRecord() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+
+        TaskExecutionDetails taskExecutionDetails = testee.readDetails(TASK_ID).block();
+        assertThat(taskExecutionDetails).isEqualTo(TASK_EXECUTION_DETAILS);
+    }
+
+    @Test
+    void saveDetailsShouldUpdateRecords() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+
+        testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED).block();
+
+        TaskExecutionDetails taskExecutionDetails = testee.readDetails(TASK_ID).block();
+        assertThat(taskExecutionDetails).isEqualTo(TASK_EXECUTION_DETAILS_UPDATED);
+    }
+
+    @Test
+    void readDetailsShouldReturnEmptyWhenNone() {
+        Optional<TaskExecutionDetails> taskExecutionDetails = testee.readDetails(TASK_ID).blockOptional();
+        assertThat(taskExecutionDetails).isEmpty();
+    }
+
+    @Test
+    void listDetailsShouldReturnEmptyWhenNone() {
+        Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
+        assertThat(taskExecutionDetails).isEmpty();
+    }
+
+    @Test
+    void listDetailsShouldReturnAllRecords() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+        testee.saveDetails(TASK_EXECUTION_DETAILS_2).block();
+
+        Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
+        assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS, TASK_EXECUTION_DETAILS_2);
+    }
+
+    @Test
+    void listDetailsShouldReturnLastUpdatedRecords() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+        testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED).block();
+
+        Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
+        assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS_UPDATED);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org