You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by GitBox <gi...@apache.org> on 2022/09/27 06:31:02 UTC

[GitHub] [james-project] vttranlina opened a new pull request, #1208: CassandraEventStore support removeBefore method for remove old data

vttranlina opened a new pull request, #1208:
URL: https://github.com/apache/james-project/pull/1208

   Why: Prevents unwanted infinite data growth


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1208: CassandraEventStore support removeBefore method for remove old data

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r980810789


##########
event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala:
##########
@@ -38,7 +39,16 @@ class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends Event
 
   private def doAppendAll(events: Iterable[Event]): SMono[Void] = {
     Preconditions.checkArgument(Event.belongsToSameAggregate(events))
-    eventStoreDao.appendAll(events)
+
+    val preconditionPublisher = SFlux.fromIterable(events)

Review Comment:
   This precondition for `CassandraEventStoreTest#appendShouldThrowWhenTryingToRewriteHistory` 
   It will be more heavy when we don't have more table.
   any better idea are welcome



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981890320


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/routes/TasksCleanupRoutes.java:
##########
@@ -0,0 +1,64 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.routes;
+
+import java.time.Clock;
+
+import javax.inject.Inject;
+
+import org.apache.james.task.TaskManager;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.services.TasksCleanupService;
+import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
+import org.apache.james.webadmin.tasks.TasksCleanupTaskRegistration;
+import org.apache.james.webadmin.utils.JsonTransformer;
+
+import spark.Service;
+
+public class TasksCleanupRoutes implements Routes {

Review Comment:
   Should be in webadmin-core IMO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
vttranlina commented on PR #1208:
URL: https://github.com/apache/james-project/pull/1208#issuecomment-1264375189

   > Issue with injections:
   
   circular dependency issue, 
   I proposed we move `EventSourcingTaskManager#remove`  to outside a taskManager, (revert of https://github.com/apache/james-project/pull/1208#discussion_r982203161) 
   Or create a interface for `JsonTaskSerializer` (for support proxy, but it more complicate)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r983069223


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/services/TasksCleanupService.java:
##########
@@ -0,0 +1,113 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.services;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.task.Task;
+import org.apache.james.task.eventsourcing.TaskAggregateId;
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class TasksCleanupService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TasksCleanupService.class);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long removedTasksCount;
+            private final long processedTaskCount;
+
+            public Snapshot(long removedTasksCount, long processedTaskCount) {
+                this.removedTasksCount = removedTasksCount;
+                this.processedTaskCount = processedTaskCount;
+            }
+
+            public long getRemovedTasksCount() {
+                return removedTasksCount;
+            }
+
+            public long getProcessedTaskCount() {
+                return processedTaskCount;
+            }
+        }
+
+        private final AtomicLong removedTasksCount;
+        private final AtomicLong processedTaskCount;
+
+        public Context() {
+            removedTasksCount = new AtomicLong();
+            processedTaskCount = new AtomicLong();
+        }
+
+        void incrementRemovedTasksCount() {
+            removedTasksCount.incrementAndGet();
+        }
+
+        void incrementProcessedTaskCount() {
+            processedTaskCount.incrementAndGet();
+        }
+
+        void incrementRemovedTasksCount(int count) {
+            removedTasksCount.set(removedTasksCount.get() + count);
+        }
+
+        public Snapshot snapshot() {
+            return new Snapshot(removedTasksCount.get(), processedTaskCount.get());
+        }
+    }
+
+    private final EventStore eventStore;
+    private final TaskExecutionDetailsProjection taskExecutionDetailsProjection;
+
+    @Inject
+    public TasksCleanupService(EventStore eventStore, TaskExecutionDetailsProjection taskExecutionDetailsProjection) {
+        this.eventStore = eventStore;
+        this.taskExecutionDetailsProjection = taskExecutionDetailsProjection;
+    }
+
+    public Mono<Task.Result> removeBeforeDate(Instant beforeDate, Context context) {
+        return Flux.from(taskExecutionDetailsProjection.listDetailsByBeforeDate(beforeDate))
+            .doOnNext(oldTaskDetail -> context.incrementProcessedTaskCount())

Review Comment:
   We could have a @Predestroy steps that attempts to cancel the task upon ordered shutdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981916461


##########
server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala:
##########
@@ -44,19 +45,20 @@ object CassandraTaskExecutionDetailsProjectionModule {
 
   val MODULE: CassandraModule = CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME)
     .comment("Projection of TaskExecutionDetails used by the distributed task manager")
-    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE))
+    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE)
+      .withClusteringOrder(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, ClusteringOrder.ASC))
     .statement(statement => types => statement
       .withPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, DataTypes.UUID)
+      .withClusteringColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, DataTypes.TIMESTAMP)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, DataTypes.TIMESTAMP))

Review Comment:
   Thank for your explain



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r982198224


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/services/TasksCleanupService.java:
##########
@@ -0,0 +1,113 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.services;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.task.Task;
+import org.apache.james.task.eventsourcing.TaskAggregateId;
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class TasksCleanupService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TasksCleanupService.class);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long removedTasksCount;
+            private final long processedTaskCount;
+
+            public Snapshot(long removedTasksCount, long processedTaskCount) {
+                this.removedTasksCount = removedTasksCount;
+                this.processedTaskCount = processedTaskCount;
+            }
+
+            public long getRemovedTasksCount() {
+                return removedTasksCount;
+            }
+
+            public long getProcessedTaskCount() {
+                return processedTaskCount;
+            }
+        }
+
+        private final AtomicLong removedTasksCount;
+        private final AtomicLong processedTaskCount;
+
+        public Context() {
+            removedTasksCount = new AtomicLong();
+            processedTaskCount = new AtomicLong();
+        }
+
+        void incrementRemovedTasksCount() {
+            removedTasksCount.incrementAndGet();
+        }
+
+        void incrementProcessedTaskCount() {
+            processedTaskCount.incrementAndGet();
+        }
+
+        void incrementRemovedTasksCount(int count) {
+            removedTasksCount.set(removedTasksCount.get() + count);
+        }
+
+        public Snapshot snapshot() {
+            return new Snapshot(removedTasksCount.get(), processedTaskCount.get());
+        }
+    }
+
+    private final EventStore eventStore;
+    private final TaskExecutionDetailsProjection taskExecutionDetailsProjection;
+
+    @Inject
+    public TasksCleanupService(EventStore eventStore, TaskExecutionDetailsProjection taskExecutionDetailsProjection) {
+        this.eventStore = eventStore;
+        this.taskExecutionDetailsProjection = taskExecutionDetailsProjection;
+    }
+
+    public Mono<Task.Result> removeBeforeDate(Instant beforeDate, Context context) {
+        return Flux.from(taskExecutionDetailsProjection.listDetailsByBeforeDate(beforeDate))
+            .doOnNext(oldTaskDetail -> context.incrementProcessedTaskCount())

Review Comment:
   Filter to exclude waiting/inprogress tasks ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
vttranlina commented on PR #1208:
URL: https://github.com/apache/james-project/pull/1208#issuecomment-1264806894

   Please restart a CI.
   I can't reproduce ci error on local


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: CassandraEventStore support removeBefore method for remove old data

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r980833562


##########
event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala:
##########
@@ -36,4 +38,7 @@ trait EventStore {
   def appendAll(events: Iterable[Event]): Publisher[Void]
 
   def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History]
+
+  def removeBefore(before: Instant): Publisher[Void]

Review Comment:
   No. We will list old tasks, then we will destroy there additional informations in here based on AggregateId.
   
   No need date.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on PR #1208:
URL: https://github.com/apache/james-project/pull/1208#issuecomment-1264297916

   Issue with injections:
   
   ```
   Unable to provision, see the following errors:
   
   1) [Guice/CanNotProxyClass]: Tried proxying JsonTaskSerializer to support a circular dependency, but it is not an interface.
     at RabbitMQWorkQueueSupplier.<init>(RabbitMQWorkQueueSupplier.scala:32)
         \_ for 3rd parameter jsonTaskSerializer
     at DistributedTaskManagerModule.configure(DistributedTaskManagerModule.java:70)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$OverrideModule -> DistributedTaskManagerModule
     while locating RabbitMQWorkQueueSupplier
     at EventSourcingTaskManager.<init>(EventSourcingTaskManager.scala:41)
         \_ for 1st parameter workQueueSupplier
     at DistributedTaskManagerModule.configure(DistributedTaskManagerModule.java:69)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$OverrideModule -> DistributedTaskManagerModule
     while locating EventSourcingTaskManager
     at TasksCleanupService.<init>(TasksCleanupService.java:88)
         \_ for 1st parameter
     at TasksCleanupRoutesModule.configure(TasksCleanupRoutesModule.java:34)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> TasksCleanupRoutesModule
     at TasksCleanupTaskSerializationModule.tasksCleanupTask(TasksCleanupTaskSerializationModule.java:42)
         \_ for 1st parameter
     at TasksCleanupTaskSerializationModule.tasksCleanupTask(TasksCleanupTaskSerializationModule.java:42)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> TasksCleanupTaskSerializationModule
     while locating TaskDTOModule<? extends Task, ? extends TaskDTO> annotated with @Element(setName=,uniqueId=4121, type=MULTIBINDER, keyType=)
     at TaskSerializationModule.provideTaskDTOModules(TaskSerializationModule.java:83)
         \_ for 1st parameter
     at TaskSerializationModule.provideTaskDTOModules(TaskSerializationModule.java:83)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$OverrideModule -> DistributedTaskManagerModule -> TaskSerializationModule
     at JsonTaskSerializer.<init>(JsonTaskSerializer.java:59)
         \_ for 1st parameter
     at DistributedTaskSerializationModule.taskCancelRequestedSerialization(DistributedTaskSerializationModule.java:71)
         \_ for 1st parameter
     at DistributedTaskSerializationModule.taskCancelRequestedSerialization(DistributedTaskSerializationModule.java:71)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$OverrideModule -> DistributedTaskSerializationModule
     while locating EventDTOModule<? extends Event, ? extends EventDTO> annotated with @Element(setName=,uniqueId=4427, type=MULTIBINDER, keyType=)
     at JsonEventSerializer.<init>(JsonEventSerializer.scala:64)
         \_ for 1st parameter modules
     at EventStoreDao.<init>(EventStoreDao.scala:35)
         \_ for 2nd parameter jsonEventSerializer
     at CassandraEventStore.<init>(CassandraEventStore.scala:30)
         \_ for 1st parameter eventStoreDao
     at CassandraEventStoreModule.configure(CassandraEventStoreModule.java:38)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> CassandraEventStoreModule
     while locating CassandraEventStore
     at EventsourcingConfigurationManagement.<init>(EventsourcingConfigurationManagement.java:47)
         \_ for 1st parameter
     at CassandraMailQueueViewModule.configure(CassandraMailQueueViewModule.java:69)
         \_ installed by: Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$OverrideModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> Modules$CombinedModule -> CassandraMailQueueViewModule
     at CassandraMailQueueViewStartUpCheck.<init>(CassandraMailQueueViewStartUpCheck.java:38)
         \_ for 1st parameter
     while locating CassandraMailQueueViewStartUpCheck
     while locating StartUpCheck annotated with @Element(setName=,uniqueId=4436, type=MULTIBINDER, keyType=)
     at StartUpChecksPerformer$StartUpChecks.<init>(StartUpChecksPerformer.java:80)
         \_ for 1st parameter
     at StartUpChecksPerformer.<init>(StartUpChecksPerformer.java:115)
         \_ for 1st parameter
     while locating StartUpChecksPerformer
   ```
   
   Please have a look.
   
   https://ci-builds.apache.org/job/james/job/ApacheJames/job/PR-1208/5/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981891151


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/tasks/TasksCleanupTaskRegistration.java:
##########
@@ -0,0 +1,46 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.tasks;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+import org.apache.james.webadmin.services.TasksCleanupService;
+import org.apache.james.webadmin.utils.ParametersExtractor;
+
+import spark.Request;
+
+public class TasksCleanupTaskRegistration extends TaskFromRequestRegistry.TaskRegistration {
+
+    private static final TaskRegistrationKey REGISTRATION_KEY = TaskRegistrationKey.of("tasks-cleanup");
+
+
+    public TasksCleanupTaskRegistration(TasksCleanupService service, Clock clock) {
+        super(REGISTRATION_KEY, request -> toTask(request, service, clock));
+    }
+
+    private static TasksCleanupTask toTask(Request request, TasksCleanupService service, Clock clock) {
+        Duration orderThanDuration = ParametersExtractor.extractDuration(request, "orderThan")
+            .orElseThrow(() -> new IllegalArgumentException("missing or invalid `orderThan` parameter"));

Review Comment:
   ```suggestion
           Duration orderThanDuration = ParametersExtractor.extractDuration(request, "olderThan")
               .orElseThrow(() -> new IllegalArgumentException("missing or invalid `olderThan` parameter"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981890666


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/tasks/TasksCleanupTaskAdditionalInformationDTO.java:
##########
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.tasks;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class TasksCleanupTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+
+    public static AdditionalInformationDTOModule<TasksCleanupTask.Details, TasksCleanupTaskAdditionalInformationDTO> module() {
+        return DTOModule
+            .forDomainObject(TasksCleanupTask.Details.class)
+            .convertToDTO(TasksCleanupTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(dto -> new TasksCleanupTask.Details(dto.getTimestamp(), dto.getRemovedTaskCount(), dto.getOrderThan()))
+            .toDTOConverter((domain, type) -> new TasksCleanupTaskAdditionalInformationDTO(type, domain.getRemovedTasksCount(), domain.getOrderThan(), domain.timestamp()))
+            .typeName(TasksCleanupTask.TASK_TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+    }
+
+    private final String type;
+    private final long removedTaskCount;
+    private final Instant orderThan;

Review Comment:
   ```suggestion
       private final Instant olderThan;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981892331


##########
server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala:
##########
@@ -44,19 +45,20 @@ object CassandraTaskExecutionDetailsProjectionModule {
 
   val MODULE: CassandraModule = CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME)
     .comment("Projection of TaskExecutionDetails used by the distributed task manager")
-    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE))
+    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE)
+      .withClusteringOrder(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, ClusteringOrder.ASC))
     .statement(statement => types => statement
       .withPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, DataTypes.UUID)
+      .withClusteringColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, DataTypes.TIMESTAMP)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, DataTypes.TIMESTAMP))

Review Comment:
   Changing a Cassandra module is a hard red line!
   
   What's the migration strategy here? Can we avoid such a breaking change?
   
   IMO we would be better of not toucing the data layer in this pull request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981892934


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/tasks/TasksCleanupTaskRegistration.java:
##########
@@ -0,0 +1,46 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.tasks;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+
+import org.apache.james.webadmin.services.TasksCleanupService;
+import org.apache.james.webadmin.utils.ParametersExtractor;
+
+import spark.Request;
+
+public class TasksCleanupTaskRegistration extends TaskFromRequestRegistry.TaskRegistration {

Review Comment:
   No tests ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: CassandraEventStore support removeBefore method for remove old data

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r980827109


##########
event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala:
##########
@@ -38,7 +39,16 @@ class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends Event
 
   private def doAppendAll(events: Iterable[Event]): SMono[Void] = {
     Preconditions.checkArgument(Event.belongsToSameAggregate(events))
-    eventStoreDao.appendAll(events)
+
+    val preconditionPublisher = SFlux.fromIterable(events)

Review Comment:
   No IMO we should delete an entire aggregate. This can be done atomically. So we do not need preconditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] Arsnael commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
Arsnael commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r983063827


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/services/TasksCleanupService.java:
##########
@@ -0,0 +1,113 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.services;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.task.Task;
+import org.apache.james.task.eventsourcing.TaskAggregateId;
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class TasksCleanupService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TasksCleanupService.class);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long removedTasksCount;
+            private final long processedTaskCount;
+
+            public Snapshot(long removedTasksCount, long processedTaskCount) {
+                this.removedTasksCount = removedTasksCount;
+                this.processedTaskCount = processedTaskCount;
+            }
+
+            public long getRemovedTasksCount() {
+                return removedTasksCount;
+            }
+
+            public long getProcessedTaskCount() {
+                return processedTaskCount;
+            }
+        }
+
+        private final AtomicLong removedTasksCount;
+        private final AtomicLong processedTaskCount;
+
+        public Context() {
+            removedTasksCount = new AtomicLong();
+            processedTaskCount = new AtomicLong();
+        }
+
+        void incrementRemovedTasksCount() {
+            removedTasksCount.incrementAndGet();
+        }
+
+        void incrementProcessedTaskCount() {
+            processedTaskCount.incrementAndGet();
+        }
+
+        void incrementRemovedTasksCount(int count) {
+            removedTasksCount.set(removedTasksCount.get() + count);
+        }
+
+        public Snapshot snapshot() {
+            return new Snapshot(removedTasksCount.get(), processedTaskCount.get());
+        }
+    }
+
+    private final EventStore eventStore;
+    private final TaskExecutionDetailsProjection taskExecutionDetailsProjection;
+
+    @Inject
+    public TasksCleanupService(EventStore eventStore, TaskExecutionDetailsProjection taskExecutionDetailsProjection) {
+        this.eventStore = eventStore;
+        this.taskExecutionDetailsProjection = taskExecutionDetailsProjection;
+    }
+
+    public Mono<Task.Result> removeBeforeDate(Instant beforeDate, Context context) {
+        return Flux.from(taskExecutionDetailsProjection.listDetailsByBeforeDate(beforeDate))
+            .doOnNext(oldTaskDetail -> context.incrementProcessedTaskCount())

Review Comment:
   We have an other issue then..?
   It seems when james restarts for a reason, if a task is in progress, it's not able to recover and continue. But the status stays forever in progress... (and with some tasks that take a very long time to finished because we have millions and millions of mails, it actually happens often)
   
   I would love seeing a recover process with in progress task if james restarts but... my guess is that's likely a rather complex topic^^'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1208: CassandraEventStore support removeBefore method for remove old data

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r980829850


##########
event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala:
##########
@@ -36,4 +38,7 @@ trait EventStore {
   def appendAll(events: Iterable[Event]): Publisher[Void]
 
   def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History]
+
+  def removeBefore(before: Instant): Publisher[Void]

Review Comment:
   I'm not clear your mean, we don't need `beforeDate` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981892774


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/tasks/TasksCleanupTaskDTO.java:
##########
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.tasks;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+import org.apache.james.webadmin.services.TasksCleanupService;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class TasksCleanupTaskDTO implements TaskDTO {

Review Comment:
   No tests?



##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/tasks/TasksCleanupTaskAdditionalInformationDTO.java:
##########
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.tasks;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class TasksCleanupTaskAdditionalInformationDTO implements AdditionalInformationDTO {

Review Comment:
   No tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: CassandraEventStore support removeBefore method for remove old data

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r980827813


##########
event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala:
##########
@@ -36,4 +38,7 @@ trait EventStore {
   def appendAll(events: Iterable[Event]): Publisher[Void]
 
   def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History]
+
+  def removeBefore(before: Instant): Publisher[Void]

Review Comment:
   Should be
   
   ```suggestion
     def remove(aggregateId: AggregateId): Publisher[Void]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981890797


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/tasks/TasksCleanupTaskAdditionalInformationDTO.java:
##########
@@ -0,0 +1,74 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.tasks;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class TasksCleanupTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+
+    public static AdditionalInformationDTOModule<TasksCleanupTask.Details, TasksCleanupTaskAdditionalInformationDTO> module() {
+        return DTOModule
+            .forDomainObject(TasksCleanupTask.Details.class)
+            .convertToDTO(TasksCleanupTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(dto -> new TasksCleanupTask.Details(dto.getTimestamp(), dto.getRemovedTaskCount(), dto.getOrderThan()))
+            .toDTOConverter((domain, type) -> new TasksCleanupTaskAdditionalInformationDTO(type, domain.getRemovedTasksCount(), domain.getOrderThan(), domain.timestamp()))
+            .typeName(TasksCleanupTask.TASK_TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+    }
+
+    private final String type;
+    private final long removedTaskCount;

Review Comment:
   ```suggestion
       private final long removedTaskCount;
       private final long processedTaskCount;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981913236


##########
server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala:
##########
@@ -44,19 +45,20 @@ object CassandraTaskExecutionDetailsProjectionModule {
 
   val MODULE: CassandraModule = CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME)
     .comment("Projection of TaskExecutionDetails used by the distributed task manager")
-    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE))
+    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE)
+      .withClusteringOrder(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, ClusteringOrder.ASC))
     .statement(statement => types => statement
       .withPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, DataTypes.UUID)
+      .withClusteringColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, DataTypes.TIMESTAMP)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, DataTypes.TIMESTAMP))

Review Comment:
   I understand your idea but it have serious consequences on existing data. 
   
   Existing data needs considerations, your changes would break existing instalations. 
   
   We need to be careful with data structure changes, do them only if necessary and come up with a migration strategy.
   
   Here my feeling is that we don't need it. Cassandra is OK reading 100 thousands entries from time to time (would already be extreme for this use case, we 'd rather be with a rough estimate more on the thousand entry range). Apps are ok filtering such data. Early optimisation is the root of all evil. Let's not try to optimize this just yet, until we consider that optimisation is truely needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r982203161


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/services/TasksCleanupService.java:
##########
@@ -0,0 +1,113 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.services;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.task.Task;
+import org.apache.james.task.eventsourcing.TaskAggregateId;
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class TasksCleanupService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TasksCleanupService.class);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long removedTasksCount;
+            private final long processedTaskCount;
+
+            public Snapshot(long removedTasksCount, long processedTaskCount) {
+                this.removedTasksCount = removedTasksCount;
+                this.processedTaskCount = processedTaskCount;
+            }
+
+            public long getRemovedTasksCount() {
+                return removedTasksCount;
+            }
+
+            public long getProcessedTaskCount() {
+                return processedTaskCount;
+            }
+        }
+
+        private final AtomicLong removedTasksCount;
+        private final AtomicLong processedTaskCount;
+
+        public Context() {
+            removedTasksCount = new AtomicLong();
+            processedTaskCount = new AtomicLong();
+        }
+
+        void incrementRemovedTasksCount() {
+            removedTasksCount.incrementAndGet();
+        }
+
+        void incrementProcessedTaskCount() {
+            processedTaskCount.incrementAndGet();
+        }
+
+        void incrementRemovedTasksCount(int count) {
+            removedTasksCount.set(removedTasksCount.get() + count);
+        }
+
+        public Snapshot snapshot() {
+            return new Snapshot(removedTasksCount.get(), processedTaskCount.get());
+        }
+    }
+
+    private final EventStore eventStore;
+    private final TaskExecutionDetailsProjection taskExecutionDetailsProjection;
+
+    @Inject
+    public TasksCleanupService(EventStore eventStore, TaskExecutionDetailsProjection taskExecutionDetailsProjection) {
+        this.eventStore = eventStore;
+        this.taskExecutionDetailsProjection = taskExecutionDetailsProjection;
+    }
+
+    public Mono<Task.Result> removeBeforeDate(Instant beforeDate, Context context) {
+        return Flux.from(taskExecutionDetailsProjection.listDetailsByBeforeDate(beforeDate))
+            .doOnNext(oldTaskDetail -> context.incrementProcessedTaskCount())
+            .flatMap(taskExecutionDetails -> Mono.from(eventStore.remove(new TaskAggregateId(taskExecutionDetails.taskId())))

Review Comment:
   ```suggestion
           return Flux.from(taskExecutionDetailsProjection.listDetailsByBeforeDate(beforeDate))
               .doOnNext(oldTaskDetail -> context.incrementProcessedTaskCount())
               .flatMap(taskExecutionDetails -> Mono.from(eventStore.remove(new TaskAggregateId(taskExecutionDetails.taskId())))
               
   ```
   
   How about to move this "deletion logic" into EventSourcingTaskManager ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r982204327


##########
server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java:
##########
@@ -111,4 +120,65 @@ void listDetailsShouldReturnLastUpdatedRecords() {
         Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
         assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS_UPDATED());
     }
+
+    @Test
+    void listBeforeDateShouldReturnCorrectEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-01T00:00:00Z"), ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        TaskExecutionDetails taskExecutionDetails2 = new TaskExecutionDetails(TASK_ID_2(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-20T00:00:00Z"), ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        testee.saveDetails(taskExecutionDetails1).block();
+        testee.saveDetails(taskExecutionDetails2).block();
+
+        assertThat(Flux.from(testee.listDetailsByBeforeDate(Instant.parse("2000-01-15T12:00:55Z"))).collectList().block())
+            .containsOnly(taskExecutionDetails1);
+    }
+
+    @Test
+    void removeShouldDeleteAssignEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,

Review Comment:
   Not sure, 
   But IMO it will not happen in real world, this endpoint for remove "old task" with "olderThan" parameter. 
   Eg: I don't think has a task still in-progress in more than 15 days. And admin try to remove tasks too early.
   
   Of course, here is code logic, let wait other comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] Arsnael commented on pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
Arsnael commented on PR #1208:
URL: https://github.com/apache/james-project/pull/1208#issuecomment-1261755212

   Needs a rebase
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981891076


##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/tasks/TasksCleanupTaskDTO.java:
##########
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.tasks;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+import org.apache.james.webadmin.services.TasksCleanupService;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class TasksCleanupTaskDTO implements TaskDTO {
+
+    public static TaskDTOModule<TasksCleanupTask, TasksCleanupTaskDTO> module(TasksCleanupService service) {
+        return DTOModule
+            .forDomainObject(TasksCleanupTask.class)
+            .convertToDTO(TasksCleanupTaskDTO.class)
+            .toDomainObjectConverter(dto -> new TasksCleanupTask(service, dto.getOrderThan()))
+            .toDTOConverter((domain, type) -> new TasksCleanupTaskDTO(type, domain.getBeforeDate()))
+            .typeName(TasksCleanupTask.TASK_TYPE.asString())
+            .withFactory(TaskDTOModule::new);
+    }
+
+
+    private final String type;
+    private final Instant orderThan;
+
+    public TasksCleanupTaskDTO(@JsonProperty("type") String type,
+                               @JsonProperty("orderThan") Instant orderThan) {
+        this.type = type;
+        this.orderThan = orderThan;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    public Instant getOrderThan() {

Review Comment:
   ```suggestion
       public Instant getOlderThan() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] quantranhong1999 commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
quantranhong1999 commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r982111701


##########
server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala:
##########
@@ -45,4 +46,8 @@ class CassandraTaskExecutionDetailsProjection @Inject()(cassandraTaskExecutionDe
   override def listReactive(): Publisher[TaskExecutionDetails] = cassandraTaskExecutionDetailsProjectionDAO.listDetails()
 
   override def updateReactive(details: TaskExecutionDetails): Publisher[Void] = cassandraTaskExecutionDetailsProjectionDAO.saveDetails(details)
+
+  override def listDetailsByBeforeDate(beforeDate: Instant): Publisher[TaskExecutionDetails] = cassandraTaskExecutionDetailsProjectionDAO.listDetailsByBeforeDate(beforeDate)
+
+  override def remove(taskExecutionDetails: TaskExecutionDetails): Publisher[Void] = ???

Review Comment:
   ```suggestion
     override def remove(taskExecutionDetails: TaskExecutionDetails): Publisher[Void] = cassandraTaskExecutionDetailsProjectionDAO.remove(taskExecutionDetails)
   ```



##########
server/protocols/webadmin/webadmin-cassandra/src/main/java/org/apache/james/webadmin/services/TasksCleanupService.java:
##########
@@ -0,0 +1,113 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.services;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.task.Task;
+import org.apache.james.task.eventsourcing.TaskAggregateId;
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.util.ReactorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class TasksCleanupService {
+    private static final Logger LOGGER = LoggerFactory.getLogger(TasksCleanupService.class);
+
+    public static class Context {
+
+        public static class Snapshot {
+            private final long removedTasksCount;
+            private final long processedTaskCount;
+
+            public Snapshot(long removedTasksCount, long processedTaskCount) {
+                this.removedTasksCount = removedTasksCount;
+                this.processedTaskCount = processedTaskCount;
+            }
+
+            public long getRemovedTasksCount() {
+                return removedTasksCount;
+            }
+
+            public long getProcessedTaskCount() {
+                return processedTaskCount;
+            }
+        }
+
+        private final AtomicLong removedTasksCount;
+        private final AtomicLong processedTaskCount;
+
+        public Context() {
+            removedTasksCount = new AtomicLong();
+            processedTaskCount = new AtomicLong();
+        }
+
+        void incrementRemovedTasksCount() {
+            removedTasksCount.incrementAndGet();
+        }
+
+        void incrementProcessedTaskCount() {
+            processedTaskCount.incrementAndGet();
+        }
+
+        void incrementRemovedTasksCount(int count) {
+            removedTasksCount.set(removedTasksCount.get() + count);
+        }
+
+        public Snapshot snapshot() {
+            return new Snapshot(removedTasksCount.get(), processedTaskCount.get());
+        }
+    }
+
+    private final EventStore eventStore;
+    private final TaskExecutionDetailsProjection taskExecutionDetailsProjection;
+
+    @Inject
+    public TasksCleanupService(EventStore eventStore, TaskExecutionDetailsProjection taskExecutionDetailsProjection) {
+        this.eventStore = eventStore;
+        this.taskExecutionDetailsProjection = taskExecutionDetailsProjection;
+    }
+
+    public Mono<Task.Result> removeBeforeDate(Instant beforeDate, Context context) {
+        return Flux.from(taskExecutionDetailsProjection.listDetailsByBeforeDate(beforeDate))
+            .doOnNext(oldTaskDetail -> context.incrementProcessedTaskCount())

Review Comment:
   What if that task is in progress?



##########
server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java:
##########
@@ -111,4 +120,65 @@ void listDetailsShouldReturnLastUpdatedRecords() {
         Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
         assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS_UPDATED());
     }
+
+    @Test
+    void listBeforeDateShouldReturnCorrectEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-01T00:00:00Z"), ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        TaskExecutionDetails taskExecutionDetails2 = new TaskExecutionDetails(TASK_ID_2(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-20T00:00:00Z"), ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        testee.saveDetails(taskExecutionDetails1).block();
+        testee.saveDetails(taskExecutionDetails2).block();
+
+        assertThat(Flux.from(testee.listDetailsByBeforeDate(Instant.parse("2000-01-15T12:00:55Z"))).collectList().block())
+            .containsOnly(taskExecutionDetails1);
+    }
+
+    @Test
+    void removeShouldDeleteAssignEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,

Review Comment:
   Q: What if we remove an in-progress task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] quantranhong1999 commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
quantranhong1999 commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r982207894


##########
server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAOTest.java:
##########
@@ -111,4 +120,65 @@ void listDetailsShouldReturnLastUpdatedRecords() {
         Stream<TaskExecutionDetails> taskExecutionDetails = testee.listDetails().toStream();
         assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS_UPDATED());
     }
+
+    @Test
+    void listBeforeDateShouldReturnCorrectEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-01T00:00:00Z"), ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        TaskExecutionDetails taskExecutionDetails2 = new TaskExecutionDetails(TASK_ID_2(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-20T00:00:00Z"), ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        testee.saveDetails(taskExecutionDetails1).block();
+        testee.saveDetails(taskExecutionDetails2).block();
+
+        assertThat(Flux.from(testee.listDetailsByBeforeDate(Instant.parse("2000-01-15T12:00:55Z"))).collectList().block())
+            .containsOnly(taskExecutionDetails1);
+    }
+
+    @Test
+    void removeShouldDeleteAssignEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,

Review Comment:
   Re-index task/ Ham report with million messages could take that long IMO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981891512


##########
server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala:
##########
@@ -62,23 +60,34 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: CqlSession,
 
   private val listStatement = session.prepare(selectFrom(TABLE_NAME).all().build())
 
+  private val listBeforeDateStatement : PreparedStatement = session.prepare(selectFrom(TABLE_NAME)
+    .all()
+    .allowFiltering()
+    .whereColumn(SUBMITTED_DATE).isLessThanOrEqualTo(bindMarker(SUBMITTED_DATE))
+    .build())
+
+  private val removeStatement : PreparedStatement = session.prepare(deleteFrom(TABLE_NAME)
+    .whereColumn(TASK_ID).isEqualTo(bindMarker(TASK_ID))
+    .whereColumn(SUBMITTED_DATE).isEqualTo(bindMarker(SUBMITTED_DATE))
+    .build())
+
   def saveDetails(details: TaskExecutionDetails): Mono[Void] =
     Mono.from(serializeAdditionalInformation(details)
       .flatMap(serializeAdditionalInformation => {
     val boundStatement =  insertStatement.bind()
       .setUuid(TASK_ID, details.getTaskId.getValue)
       .setString(TYPE, details.getType.asString())
       .setString(STATUS, details.getStatus.getValue)
-      .setUdtValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate))
+      .setInstant(SUBMITTED_DATE, details.getSubmittedDate)

Review Comment:
   Please avoid this breaking change!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] vttranlina commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
vttranlina commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981902846


##########
server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala:
##########
@@ -44,19 +45,20 @@ object CassandraTaskExecutionDetailsProjectionModule {
 
   val MODULE: CassandraModule = CassandraModule.table(CassandraTaskExecutionDetailsProjectionTable.TABLE_NAME)
     .comment("Projection of TaskExecutionDetails used by the distributed task manager")
-    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE))
+    .options(options => options.withCaching(true, SchemaBuilder.RowsPerPartition.NONE)
+      .withClusteringOrder(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, ClusteringOrder.ASC))
     .statement(statement => types => statement
       .withPartitionKey(CassandraTaskExecutionDetailsProjectionTable.TASK_ID, DataTypes.UUID)
+      .withClusteringColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.ADDITIONAL_INFORMATION, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.TYPE, DataTypes.TEXT)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.STATUS, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, DataTypes.TIMESTAMP)
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, DataTypes.TIMESTAMP)
       .withColumn(CassandraTaskExecutionDetailsProjectionTable.CANCEL_REQUESTED_NODE, DataTypes.TEXT)
-      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, types.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
+      .withColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, DataTypes.TIMESTAMP))

Review Comment:
   My idea is we can filter "olderThan" at db layer. But It will be hard with UDTType when orderBy.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[GitHub] [james-project] chibenwa commented on a diff in pull request #1208: JAMES-3825 Task to clean up tasks

Posted by GitBox <gi...@apache.org>.
chibenwa commented on code in PR #1208:
URL: https://github.com/apache/james-project/pull/1208#discussion_r981891877


##########
server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala:
##########
@@ -62,23 +60,34 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: CqlSession,
 
   private val listStatement = session.prepare(selectFrom(TABLE_NAME).all().build())
 
+  private val listBeforeDateStatement : PreparedStatement = session.prepare(selectFrom(TABLE_NAME)
+    .all()
+    .allowFiltering()
+    .whereColumn(SUBMITTED_DATE).isLessThanOrEqualTo(bindMarker(SUBMITTED_DATE))
+    .build())
+
+  private val removeStatement : PreparedStatement = session.prepare(deleteFrom(TABLE_NAME)
+    .whereColumn(TASK_ID).isEqualTo(bindMarker(TASK_ID))
+    .whereColumn(SUBMITTED_DATE).isEqualTo(bindMarker(SUBMITTED_DATE))
+    .build())
+

Review Comment:
   IMO Using listStatement and doing application side filtering is good enough for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org