You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/01 07:22:26 UTC
[james-project] 04/09: JAMES-2813 Add
CassandraTaskExecutionDetailsProjection DAO and Module
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 845aafe989b851198fd2bf3326b074455d08ccad
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Mon Jul 29 15:44:21 2019 +0200
JAMES-2813 Add CassandraTaskExecutionDetailsProjection DAO and Module
---
server/task-cassandra/pom.xml | 15 +++
...assandraTaskExecutionDetailsProjectionDAO.scala | 85 +++++++++++++++
...andraTaskExecutionDetailsProjectionModule.scala | 57 ++++++++++
...andraTaskExecutionDetailsProjectionDAOTest.java | 118 +++++++++++++++++++++
4 files changed, 275 insertions(+)
diff --git a/server/task-cassandra/pom.xml b/server/task-cassandra/pom.xml
index 12852bc..1200ea1 100644
--- a/server/task-cassandra/pom.xml
+++ b/server/task-cassandra/pom.xml
@@ -34,6 +34,16 @@
<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-backends-cassandra</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
+ <artifactId>apache-james-backends-cassandra</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${james.groupId}</groupId>
<artifactId>apache-james-mailbox-api</artifactId>
<scope>test</scope>
</dependency>
@@ -113,6 +123,11 @@
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_${scala.base}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
new file mode 100644
index 0000000..1a46fa4
--- /dev/null
+++ b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -0,0 +1,85 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
+package org.apache.james.task.eventsourcing.cassandra
+
+import java.util.Optional
+
+import javax.inject.Inject
+
+import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule}
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._
+import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManager}
+
+import com.datastax.driver.core.querybuilder.QueryBuilder
+import com.datastax.driver.core.querybuilder.QueryBuilder.{bindMarker, insertInto, select}
+import com.datastax.driver.core.{Row, Session}
+import reactor.core.publisher.{Flux, Mono}
+
+@Inject
+class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider: CassandraTypesProvider) {
+
+ private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
+ private val dateType = typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)
+
+ private val insertStatement = session.prepare(insertInto(TABLE_NAME)
+ .value(TASK_ID, bindMarker(TASK_ID))
+ .value(TYPE, bindMarker(TYPE))
+ .value(STATUS, bindMarker(STATUS))
+ .value(SUBMITTED_DATE, bindMarker(SUBMITTED_DATE))
+ .value(STARTED_DATE, bindMarker(STARTED_DATE))
+ .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE))
+ .value(CANCELED_DATE, bindMarker(CANCELED_DATE))
+ .value(FAILED_DATE, bindMarker(FAILED_DATE)))
+
+ private val selectStatement = session.prepare(select().from(TABLE_NAME)
+ .where(QueryBuilder.eq(TASK_ID, bindMarker(TASK_ID))))
+
+ private val listStatement = session.prepare(select().from(TABLE_NAME))
+
+ def saveDetails(details : TaskExecutionDetails): Mono[Void] = cassandraAsyncExecutor.executeVoid(
+ insertStatement.bind
+ .setUUID(TASK_ID, details.getTaskId.getValue)
+ .setString(TYPE, details.getType)
+ .setString(STATUS, details.getStatus.getValue)
+ .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmitDate).orElse(null))
+ .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))
+ .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null))
+ .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null))
+ .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null)))
+
+ def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] = cassandraAsyncExecutor
+ .executeSingleRow(selectStatement.bind().setUUID(TASK_ID, taskId.getValue))
+ .map(readRow)
+
+ def listDetails(): Flux[TaskExecutionDetails] = cassandraAsyncExecutor
+ .executeRows(listStatement.bind())
+ .map(readRow)
+
+ private def readRow(row: Row): TaskExecutionDetails = new TaskExecutionDetails(
+ taskId = TaskId.fromUUID(row.getUUID(TASK_ID)),
+ `type` = row.getString(TYPE),
+ status = TaskManager.Status.fromString(row.getString(STATUS)),
+ submitDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)),
+ startedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(STARTED_DATE)),
+ completedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(COMPLETED_DATE)),
+ canceledDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(CANCELED_DATE)),
+ failedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(FAILED_DATE)),
+ additionalInformation = Optional.empty)
+}
diff --git a/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
new file mode 100644
index 0000000..698a5d7
--- /dev/null
+++ b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
@@ -0,0 +1,57 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
+package org.apache.james.task.eventsourcing.cassandra
+
+import com.datastax.driver.core.DataType.{text, uuid}
+import com.datastax.driver.core.schemabuilder.{Create, SchemaBuilder}
+import org.apache.james.backends.cassandra.components.CassandraModule
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule
+
+object CassandraTaskExecutionDetailsProjectionTable {
+ val TABLE_NAME: String = "taskExecutionDetailsProjection"
+
+ val TASK_ID: String = "taskID"
+ val TYPE: String = "type"
+ val STATUS: String = "status"
+ val SUBMITTED_DATE: String = "submittedDate"
+ val STARTED_DATE: String = "startedDate"
+ val COMPLETED_DATE: String = "completedDate"
+ val CANCELED_DATE: String = "canceledDate"
+ val FAILED_DATE: String = "failedDate"
+}
+
+object CassandraTaskExecutionDetailsProjectionModule {
+
+ val MODULE: CassandraModule = CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME)
+ .comment("Projection of TaskExecutionDetails used by the distributed task manager")
+ .options((options: Create.Options) => options
+ .caching(
+ SchemaBuilder.KeyCaching.ALL,
+ SchemaBuilder.noRows()))
+ .statement((statement: Create) => statement
+ .addPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, uuid)
+ .addColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, text)
+ .addColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, text)
+ .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+ .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+ .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+ .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+ .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
+ .build
+}
diff --git a/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java
new file mode 100644
index 0000000..fc6066b
--- /dev/null
+++ b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java
@@ -0,0 +1,118 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
+
+package org.apache.james.task.eventsourcing.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.ZonedDateTime;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class CassandraTaskExecutionDetailsProjectionDAOTest {
+
+ private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+ private static final TaskId TASK_ID_2 = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafe");
+
+ private static final Optional<TaskExecutionDetails.AdditionalInformation> ADDITIONAL_INFORMATION = Optional.empty();
+ private static final Optional<ZonedDateTime> SUBMIT_DATE = Optional.empty();
+ private static final Optional<ZonedDateTime> STARTED_DATE = Optional.empty();
+ private static final Optional<ZonedDateTime> COMPLETED_DATE = Optional.empty();
+ private static final Optional<ZonedDateTime> CANCELLED_DATE = Optional.empty();
+ private static final Optional<ZonedDateTime> FAILED_DATE = Optional.empty();
+
+ private static final TaskExecutionDetails TASK_EXECUTION_DETAILS = new TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION,
+ TaskManager.Status.COMPLETED, SUBMIT_DATE, STARTED_DATE, COMPLETED_DATE, CANCELLED_DATE, FAILED_DATE);
+ private static final TaskExecutionDetails TASK_EXECUTION_DETAILS_2 = new TaskExecutionDetails(TASK_ID_2, "type", ADDITIONAL_INFORMATION,
+ TaskManager.Status.COMPLETED, STARTED_DATE, STARTED_DATE, COMPLETED_DATE, CANCELLED_DATE, FAILED_DATE);
+ private static final TaskExecutionDetails TASK_EXECUTION_DETAILS_UPDATED = new TaskExecutionDetails(TASK_ID, "type", ADDITIONAL_INFORMATION,
+ TaskManager.Status.FAILED, STARTED_DATE, STARTED_DATE, COMPLETED_DATE, CANCELLED_DATE, FAILED_DATE);
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+ CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE, CassandraZonedDateTimeModule.MODULE, CassandraTaskExecutionDetailsProjectionModule.MODULE()));
+
+ private CassandraTaskExecutionDetailsProjectionDAO testee;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ testee = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
+ }
+
+ @Test
+ void readDetailsShouldBeAbleToRetrieveASavedRecord() {
+ testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+
+ TaskExecutionDetails taskExecutionDetails = testee.readDetails(TASK_ID).block();
+ assertThat(taskExecutionDetails).isEqualTo(TASK_EXECUTION_DETAILS);
+ }
+
+ @Test
+ void saveDetailsShouldUpdateRecords() {
+ testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+
+ testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED).block();
+
+ TaskExecutionDetails taskExecutionDetails = testee.readDetails(TASK_ID).block();
+ assertThat(taskExecutionDetails).isEqualTo(TASK_EXECUTION_DETAILS_UPDATED);
+ }
+
+ @Test
+ void readDetailsShouldReturnEmptyWhenNone() {
+ Optional<TaskExecutionDetails> taskExecutionDetails = testee.readDetails(TASK_ID).blockOptional();
+ assertThat(taskExecutionDetails).isEmpty();
+ }
+
+ @Test
+ void listDetailsShouldReturnEmptyWhenNone() {
+ Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
+ assertThat(taskExecutionDetails).isEmpty();
+ }
+
+ @Test
+ void listDetailsShouldReturnAllRecords() {
+ testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+ testee.saveDetails(TASK_EXECUTION_DETAILS_2).block();
+
+ Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
+ assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS, TASK_EXECUTION_DETAILS_2);
+ }
+
+ @Test
+ void listDetailsShouldReturnLastUpdatedRecords() {
+ testee.saveDetails(TASK_EXECUTION_DETAILS).block();
+ testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED).block();
+
+ Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
+ assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS_UPDATED);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org