You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2019/11/04 02:30:04 UTC

[james-project] 06/10: JAMES-2927 Prevent unwanted tumbstone creation for ExecutionDetails projection

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b49cceed1d41c8f5aa2f07bbf0b0557327021a0b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Oct 21 17:23:54 2019 +0700

    JAMES-2927 Prevent unwanted tumbstone creation for ExecutionDetails projection
    
    A tumbstone was created when a null value is specified in a prepared
    statement.
    
    This basically happened several time a "task details" get saved - as a
    task cannot be both failed and completed.
    
    This is due to the fact that null has the meaning `remove` and not the meaning `unspecified`, which is represented by no binding at all.
    
    Of course unwanted tumbstones occurs with a performance cost.
    
    The recommended method for fixing on the latest version of cassandra is to not bind the null value.
    
    Read this for further information: https://thelastpickle.com/blog/2016/09/15/Null-bindings-on-prepared-statements-and-undesired-tombstone-creation.html
---
 ...assandraTaskExecutionDetailsProjectionDAO.scala | 51 ++++++++++++++++------
 1 file changed, 38 insertions(+), 13 deletions(-)

diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
index fad2b48..3c69a62 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -22,15 +22,17 @@ import java.util.Optional
 
 import com.datastax.driver.core.querybuilder.QueryBuilder
 import com.datastax.driver.core.querybuilder.QueryBuilder.{bindMarker, insertInto, select}
-import com.datastax.driver.core.{Row, Session}
+import com.datastax.driver.core.{BoundStatement, Row, Session, UDTValue}
 import javax.inject.Inject
 import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule}
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
 import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer
+import org.apache.james.task._
 import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._
-import org.apache.james.task.{Hostname, TaskExecutionDetails, TaskId, TaskManager, TaskType}
 import reactor.core.publisher.{Flux, Mono}
 
+import scala.compat.java8.OptionConverters._
+
 class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typesProvider: CassandraTypesProvider, jsonTaskAdditionalInformationSerializer: JsonTaskAdditionalInformationSerializer) {
   private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
   private val dateType = typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)
@@ -55,20 +57,44 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ
 
   private val listStatement = session.prepare(select().from(TABLE_NAME))
 
-  def saveDetails(details: TaskExecutionDetails): Mono[Void] = cassandraAsyncExecutor.executeVoid(
-    insertStatement.bind
+  def saveDetails(details: TaskExecutionDetails): Mono[Void] = {
+    val boundStatement =  insertStatement.bind
       .setUUID(TASK_ID, details.getTaskId.getValue)
       .setString(TYPE, details.getType.asString())
       .setString(STATUS, details.getStatus.getValue)
       .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate))
       .setString(SUBMITTED_NODE, details.getSubmittedNode.asString)
-      .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))
-      .setString(RAN_NODE, details.getRanNode.map[String](_.asString).orElse(null))
-      .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null))
-      .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null))
-      .setString(CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString).orElse(null))
-      .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))
-      .setString(ADDITIONAL_INFORMATION, serializeAdditionalInformation(details).orElse(null)))
+
+    val bindOptionalFieldOperations = List(
+      (statement: BoundStatement) => bindOptionalUDTValue(statement, STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate)),
+      (statement: BoundStatement) => bindOptionalStringValue(statement, RAN_NODE, details.getRanNode.map[String](_.asString)),
+      (statement: BoundStatement) => bindOptionalUDTValue(statement, COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate)),
+      (statement: BoundStatement) => bindOptionalUDTValue(statement, CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate)),
+      (statement: BoundStatement) => bindOptionalStringValue(statement, CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString)),
+      (statement: BoundStatement) => bindOptionalUDTValue(statement, FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getFailedDate)),
+      (statement: BoundStatement) => bindOptionalStringValue(statement, ADDITIONAL_INFORMATION, serializeAdditionalInformation(details)),
+    )
+
+    val fullyBoundStatement = bindOptionalFieldOperations.foldLeft(boundStatement)((statement, bindFieldOperation) => {
+      bindFieldOperation(statement)
+    })
+
+    cassandraAsyncExecutor.executeVoid(fullyBoundStatement);
+  }
+
+  private def bindOptionalStringValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[String]) = {
+    fieldValue.asScala match {
+      case Some(value) => statement.setString(fieldName, value)
+      case None => statement
+    }
+  }
+
+  private def bindOptionalUDTValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[UDTValue]) = {
+    fieldValue.asScala match {
+      case Some(value) => statement.setUDTValue(fieldName, value)
+      case None => statement
+    }
+  }
 
   private def serializeAdditionalInformation(details: TaskExecutionDetails): Optional[String] = details
     .getAdditionalInformation
@@ -96,8 +122,7 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ
       canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)),
       cancelRequestedNode = Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)),
       failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)),
-      additionalInformation = () => deserializeAdditionalInformation(taskType, row),
-    )
+      additionalInformation = () => deserializeAdditionalInformation(taskType, row))
   }
 
   private def deserializeAdditionalInformation(taskType: TaskType, row: Row): Optional[TaskExecutionDetails.AdditionalInformation] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org