You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 02:30:04 UTC
[james-project] 06/10: JAMES-2927 Prevent unwanted tumbstone
creation for ExecutionDetails projection
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b49cceed1d41c8f5aa2f07bbf0b0557327021a0b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Oct 21 17:23:54 2019 +0700
JAMES-2927 Prevent unwanted tumbstone creation for ExecutionDetails projection
A tumbstone was created when a null value is specified in a prepared
statement.
This basically happened several time a "task details" get saved - as a
task cannot be both failed and completed.
This is due to the fact that null has the meaning `remove` and not the meaning `unspecified`, which is represented by no binding at all.
Of course unwanted tumbstones occurs with a performance cost.
The recommended method for fixing on the latest version of cassandra is to not bind the null value.
Read this for further information: https://thelastpickle.com/blog/2016/09/15/Null-bindings-on-prepared-statements-and-undesired-tombstone-creation.html
---
...assandraTaskExecutionDetailsProjectionDAO.scala | 51 ++++++++++++++++------
1 file changed, 38 insertions(+), 13 deletions(-)
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
index fad2b48..3c69a62 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -22,15 +22,17 @@ import java.util.Optional
import com.datastax.driver.core.querybuilder.QueryBuilder
import com.datastax.driver.core.querybuilder.QueryBuilder.{bindMarker, insertInto, select}
-import com.datastax.driver.core.{Row, Session}
+import com.datastax.driver.core.{BoundStatement, Row, Session, UDTValue}
import javax.inject.Inject
import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule}
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer
+import org.apache.james.task._
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._
-import org.apache.james.task.{Hostname, TaskExecutionDetails, TaskId, TaskManager, TaskType}
import reactor.core.publisher.{Flux, Mono}
+import scala.compat.java8.OptionConverters._
+
class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typesProvider: CassandraTypesProvider, jsonTaskAdditionalInformationSerializer: JsonTaskAdditionalInformationSerializer) {
private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
private val dateType = typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)
@@ -55,20 +57,44 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ
private val listStatement = session.prepare(select().from(TABLE_NAME))
- def saveDetails(details: TaskExecutionDetails): Mono[Void] = cassandraAsyncExecutor.executeVoid(
- insertStatement.bind
+ def saveDetails(details: TaskExecutionDetails): Mono[Void] = {
+ val boundStatement = insertStatement.bind
.setUUID(TASK_ID, details.getTaskId.getValue)
.setString(TYPE, details.getType.asString())
.setString(STATUS, details.getStatus.getValue)
.setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate))
.setString(SUBMITTED_NODE, details.getSubmittedNode.asString)
- .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))
- .setString(RAN_NODE, details.getRanNode.map[String](_.asString).orElse(null))
- .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null))
- .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null))
- .setString(CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString).orElse(null))
- .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))
- .setString(ADDITIONAL_INFORMATION, serializeAdditionalInformation(details).orElse(null)))
+
+ val bindOptionalFieldOperations = List(
+ (statement: BoundStatement) => bindOptionalUDTValue(statement, STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate)),
+ (statement: BoundStatement) => bindOptionalStringValue(statement, RAN_NODE, details.getRanNode.map[String](_.asString)),
+ (statement: BoundStatement) => bindOptionalUDTValue(statement, COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate)),
+ (statement: BoundStatement) => bindOptionalUDTValue(statement, CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate)),
+ (statement: BoundStatement) => bindOptionalStringValue(statement, CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString)),
+ (statement: BoundStatement) => bindOptionalUDTValue(statement, FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getFailedDate)),
+ (statement: BoundStatement) => bindOptionalStringValue(statement, ADDITIONAL_INFORMATION, serializeAdditionalInformation(details)),
+ )
+
+ val fullyBoundStatement = bindOptionalFieldOperations.foldLeft(boundStatement)((statement, bindFieldOperation) => {
+ bindFieldOperation(statement)
+ })
+
+ cassandraAsyncExecutor.executeVoid(fullyBoundStatement);
+ }
+
+ private def bindOptionalStringValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[String]) = {
+ fieldValue.asScala match {
+ case Some(value) => statement.setString(fieldName, value)
+ case None => statement
+ }
+ }
+
+ private def bindOptionalUDTValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[UDTValue]) = {
+ fieldValue.asScala match {
+ case Some(value) => statement.setUDTValue(fieldName, value)
+ case None => statement
+ }
+ }
private def serializeAdditionalInformation(details: TaskExecutionDetails): Optional[String] = details
.getAdditionalInformation
@@ -96,8 +122,7 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ
canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)),
cancelRequestedNode = Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)),
failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)),
- additionalInformation = () => deserializeAdditionalInformation(taskType, row),
- )
+ additionalInformation = () => deserializeAdditionalInformation(taskType, row))
}
private def deserializeAdditionalInformation(taskType: TaskType, row: Row): Optional[TaskExecutionDetails.AdditionalInformation] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org