You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2020/06/11 15:25:39 UTC

[james-project] 02/17: JAMES-3150 Add the first garbadge collection properties

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 58a8c255feb90b7ead395108583e9f70ffde3c92
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 20 16:25:12 2020 +0100

    JAMES-3150 Add the first garbadge collection properties
---
 pom.xml                                            |  10 +
 .../blob/blob-deduplicating/doc/gc-properties.adoc |   8 +-
 server/blob/blob-deduplicating/pom.xml             |  18 +-
 .../james/server/blob/deduplication/GC.scala       | 187 +++++++++++++++++
 .../src/test/scala/GCPropertiesTest.scala          | 107 ----------
 .../blob/deduplication/GCPropertiesTest.scala      | 228 +++++++++++++++++++++
 .../james/server/blob/deduplication/State.scala    |  46 +++++
 7 files changed, 483 insertions(+), 121 deletions(-)

diff --git a/pom.xml b/pom.xml
index bea0a41..4c64989 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2581,6 +2581,16 @@
                 <version>0.9.0</version>
             </dependency>
             <dependency>
+                <groupId>org.scalacheck</groupId>
+                <artifactId>scalacheck_${scala.base}</artifactId>
+                <version>1.14.3</version>
+            </dependency>
+            <dependency>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest_${scala.base}</artifactId>
+                <version>3.1.1</version>
+            </dependency>
+            <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>jcl-over-slf4j</artifactId>
                 <version>${slf4j.version}</version>
diff --git a/gc-properties.adoc b/server/blob/blob-deduplicating/doc/gc-properties.adoc
similarity index 74%
rename from gc-properties.adoc
rename to server/blob/blob-deduplicating/doc/gc-properties.adoc
index 7c69c01..6ffed72 100644
--- a/gc-properties.adoc
+++ b/server/blob/blob-deduplicating/doc/gc-properties.adoc
@@ -1,10 +1,10 @@
 = GC properties
 
 1. the execution time of the GC should be linked to
-active dataset but not to global dataset
-(for scalability purpose)
+active dataset (ie. where the number of references have changed)
+but not to global dataset (for scalability purpose)
 
-2. GC should run on live dataset
+2. GC should run on active dataset
 
  2.1. GC should not delete data being referenced by a pending process or
 still referenced
@@ -17,7 +17,7 @@ not have a different outcome than a single one
  3.1. an unreferenced piece of data should be removed after 1 day
 
  3.2. less than 10% of unreferenced data of a significant dataset
-should persist after three GC executions
+should persist
 
 4. GC should report what it does
 
diff --git a/server/blob/blob-deduplicating/pom.xml b/server/blob/blob-deduplicating/pom.xml
index e849535..ada4371 100644
--- a/server/blob/blob-deduplicating/pom.xml
+++ b/server/blob/blob-deduplicating/pom.xml
@@ -23,7 +23,7 @@
     <parent>
         <artifactId>james-server-blob</artifactId>
         <groupId>org.apache.james</groupId>
-        <version>3.5.0-SNAPSHOT</version>
+        <version>3.6.0-SNAPSHOT</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
 
@@ -32,7 +32,7 @@
 
     <name>Apache James :: Server :: Blob :: Deduplicating Blob Storage</name>
     <description>
-        An implementation of BlobStore which deduplicate the stored blobs and use a garbage collector
+        An implementation of BlobStore which deduplicates the stored blobs and uses a garbage collector
         to ensure their effective deletion.
     </description>
 
@@ -75,21 +75,19 @@
             <artifactId>scala-java8-compat_${scala.base}</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.scalactic</groupId>
-            <artifactId>scalactic_2.13</artifactId>
-            <version>3.1.1</version>
+            <groupId>org.scalacheck</groupId>
+            <artifactId>scalacheck_${scala.base}</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_2.13</artifactId>
+            <groupId>org.scalactic</groupId>
+            <artifactId>scalactic_${scala.base}</artifactId>
             <version>3.1.1</version>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.scalacheck</groupId>
-            <artifactId>scalacheck_2.13</artifactId>
-            <version>1.14.3</version>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.base}</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala
new file mode 100644
index 0000000..0fa4ea8
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala
@@ -0,0 +1,187 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.server.blob.deduplication
+
+import org.apache.james.blob.api.BlobId
+
+/**
+ * Isolating and grouping Events
+ */
+sealed abstract class Generation extends Comparable[Generation] {
+  def previous: Generation
+  def previous(times: Long): Generation =
+    (0L until times).foldLeft(this)((generation, _) => generation.previous)
+
+  def next: Generation
+  def next(times: Long): Generation =
+    (0L until times).foldLeft(this)((generation, _) => generation.next)
+
+  /**
+   * List all generations the GC is able to collect
+   */
+  def collectibles(targetedGeneration: Generation): Set[Generation] =
+    Generation.range(this, targetedGeneration.previous(GC.temporization)).toSet
+
+  def <(that: Generation): Boolean = compareTo(that) < 0
+  def <=(that: Generation): Boolean = compareTo(that) <= 0
+  def >(that: Generation): Boolean = compareTo(that) > 0
+  def >=(that: Generation): Boolean = compareTo(that) >= 0
+}
+
+object Generation {
+  val first: Generation = apply(0)
+
+  def apply(id: Long): Generation = {
+    if (id < 0) {
+      NonExistingGeneration
+    } else {
+      ValidGeneration(id)
+    }
+  }
+
+  def range(start: Generation, end: Generation): Seq[Generation] = (start, end) match {
+    case (NonExistingGeneration, NonExistingGeneration) => Seq(NonExistingGeneration)
+    case (ValidGeneration(_), NonExistingGeneration) => Nil
+    case (NonExistingGeneration, ValidGeneration(id)) =>  NonExistingGeneration +: (0L to id).map(Generation.apply)
+    case (ValidGeneration(id1), ValidGeneration(id2)) => (id1 to id2).map(Generation.apply)
+  }
+}
+
+/**
+ * Generation which has existed
+ */
+case class ValidGeneration(id: Long) extends Generation {
+  override def previous: Generation = Generation(id - 1)
+
+  override def next: Generation = copy(id + 1)
+
+  override def compareTo(t: Generation): Int = t match {
+    case NonExistingGeneration => 1
+    case that: ValidGeneration => id.compareTo(that.id)
+  }
+
+}
+
+/**
+ * NullObject for the initialisation of the GC
+ */
+case object NonExistingGeneration extends Generation {
+  override def previous: Generation = NonExistingGeneration
+
+  override def next: Generation = Generation.first
+
+  override def compareTo(t: Generation): Int = t match {
+    case NonExistingGeneration => 0
+    case _: ValidGeneration => -1
+  }
+}
+
+/**
+ * A run of the GC regarding a Set of Generations
+ */
+case class Iteration(id: Long, processedGenerations: Set[Generation], lastGeneration: Generation) {
+  def next(generations: Set[Generation], lastGeneration: Generation): Iteration = Iteration(id + 1, generations, lastGeneration)
+}
+
+object Iteration {
+  def initial: Iteration = Iteration(0, Set(), NonExistingGeneration)
+}
+
+case class ExternalID(id: String)
+
+/**
+ * Modelized users' interactions related to blobs
+ */
+sealed trait Event {
+  def blob: BlobId
+  def externalId: ExternalID
+  def generation: Generation
+}
+
+case class Reference(externalId: ExternalID, blobId: BlobId, generation: Generation) extends Event {
+  override def blob: BlobId = blobId
+}
+
+case class Dereference(generation: Generation, reference: Reference) extends Event {
+  override def blob: BlobId = reference.blob
+  override def externalId: ExternalID = reference.externalId
+}
+
+object Events {
+  def getLastGeneration(events: Seq[Event]): Generation = events.map(_.generation).maxOption
+    .getOrElse(Generation.first)
+
+}
+
+case class Report(iteration: Iteration, blobsToDelete: Set[(Generation, BlobId)])
+
+/**
+ * Accessors to the References/Dereferences made by generations
+ */
+case class StabilizedState(references: Map[Generation, Seq[Reference]], dereferences: Map[Generation, Seq[Dereference]]) {
+  private val referencedBlobsAcrossGenerations: Map[Generation, ReferencedBlobs] = {
+    val blobIds = references.keys ++ dereferences.keys
+    val maxGeneration = blobIds.maxOption.getOrElse(Generation.first)
+    val minGeneration = blobIds.minOption.getOrElse(Generation.first)
+
+    val initialRefs = Generation.range(NonExistingGeneration, minGeneration.previous).map((_, ReferencedBlobs(Map()))).toMap
+    Generation.range(minGeneration, maxGeneration)
+      .foldLeft(initialRefs)(buildGeneration)
+  }
+
+  private def buildGeneration(refs: Map[Generation, ReferencedBlobs], generation: Generation): Map[Generation, ReferencedBlobs] = {
+    val populatedRefs = references.getOrElse(generation, Set())
+      .foldLeft(refs(generation.previous))((currentReferences, ref) => currentReferences.addReferences(ref.blobId))
+
+    val expungedRefs = dereferences.getOrElse(generation, Set())
+      .foldLeft(populatedRefs)((currentReferences, ref) => currentReferences.removeReferences(ref.reference.blobId))
+
+    refs + (generation -> expungedRefs)
+  }
+
+  def referencesAt(generation: Generation): ReferencedBlobs = referencedBlobsAcrossGenerations(generation)
+
+  type ReferenceCount = Int
+
+  case class ReferencedBlobs(blobs: Map[BlobId, ReferenceCount]) {
+    def isNotReferenced(blobId: BlobId): Boolean =
+      !blobs.contains(blobId)
+
+    def addReferences(blobId: BlobId): ReferencedBlobs =
+      ReferencedBlobs(blobs.updatedWith(blobId)(oldCount => oldCount.map(count => Some(count + 1)).getOrElse(Some(1))))
+    def removeReferences(blobId: BlobId): ReferencedBlobs =
+      ReferencedBlobs(blobs.updatedWith(blobId)(oldCount => oldCount.map(_ - 1).filter(_ > 0)))
+  }
+
+}
+
+object GC {
+  val temporization: Long = 2
+  def plan(state: StabilizedState, lastIteration: Iteration, targetedGeneration: Generation): Report = {
+    val processedGenerations = lastIteration.lastGeneration.collectibles(targetedGeneration)
+    val blobsToDelete = state.dereferences
+      .filter { case (generation, _) => processedGenerations.contains(generation) }
+      .flatMap { case (_, dereferences) => dereferences }
+      .toSet
+      .filter(dereference => state.referencesAt(processedGenerations.max).isNotReferenced(dereference.reference.blobId))
+      .map(dereference => (dereference.reference.generation, dereference.reference.blobId))
+
+    Report(lastIteration.next(processedGenerations, targetedGeneration.previous(temporization)), blobsToDelete)
+  }
+}
diff --git a/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala b/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala
deleted file mode 100644
index 5de3f44..0000000
--- a/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-import org.apache.james.blob.api.{BlobId, TestBlobId}
-import org.scalacheck.Gen
-import org.scalatest.funsuite.AnyFunSuite
-
-case class Generation(id: Long)
-case class Iteration(id: Long)
-case class ExternalID(id: String) // TODO
-
-sealed trait Event
-case class Reference(externalId: ExternalID, blobId: BlobId, generation: Generation) extends Event
-case class Deletion(generation: Generation, reference: Reference) extends Event
-
-case class Report(iteration: Iteration,
-                  blobsToDelete: Set[(Generation, BlobId)]
-                 )
-
-object Generators {
-
-  val smallInteger = Gen.choose(0L,100L)
-  var current = 0;
-  val generationsGen: Gen[LazyList[Generation]] = Gen.infiniteLazyList(Gen.frequency((90, Gen.const(0)), (9, Gen.const(1)), (1, Gen.const(2))))
-    .map(list => list.scanLeft(0)(_ + _))
-    .map(list => list.map(_.toLong).map(Generation.apply))
-
-  val iterationGen = smallInteger.map(Iteration.apply)
-
-  val blobIdFactory = new TestBlobId.Factory
-
-  def blobIdGen(generation: Generation) : Gen[BlobId] = Gen.uuid.map(uuid =>
-    blobIdFactory.from(s"${generation}_$uuid"))
-
-  val externalIDGen = Gen.uuid.map(uuid => ExternalID(uuid.toString))
-
-  def referenceGen(generation: Generation): Gen[Reference] = for {
-    blobId <- blobIdGen(generation)
-    externalId <- externalIDGen
-  } yield Reference(externalId, blobId, generation)
-
-  def existingReferences : Seq[Event] => Set[Reference] = _
-    .foldLeft((Set[Reference](), Set[Reference]()))((acc, event) => event match {
-      case deletion: Deletion => (acc._1 ++ Set(deletion.reference), acc._2)
-      case reference: Reference => if (acc._1.contains(reference)) {
-        acc
-      } else {
-        (acc._1, acc._2 ++ Set(reference))
-      }
-    })._2
-
-  def deletionGen(previousEvents : Seq[Event], generation: Generation): Gen[Option[Deletion]] = {
-    val persistingReferences = existingReferences(previousEvents)
-    if (persistingReferences.isEmpty) {
-      Gen.const(None)
-    } else {
-      Gen.oneOf(persistingReferences)
-        .map(reference => Deletion(generation, reference))
-        .map(Some(_))
-    }
-  }
-
-  def duplicateReferenceGen(generation: Generation, reference: Reference): Gen[Reference] = {
-    if (reference.generation == generation) {
-      externalIDGen.map(id => reference.copy(externalId = id))
-    } else {
-      referenceGen(generation)
-    }
-  }
-
-  def eventGen(previousEvents: Seq[Event], generation: Generation): Gen[Event] = for {
-    greenAddEvent <- referenceGen(generation)
-    addEvents = previousEvents.flatMap {
-      case x: Reference => Some(x)
-      case _ => None
-    }
-    randomAddEvent <- Gen.oneOf(addEvents)
-    duplicateAddEvent <- duplicateReferenceGen(generation, randomAddEvent)
-    deleteEvent <- deletionGen(previousEvents, generation)
-    event <- Gen.oneOf(Seq(greenAddEvent, duplicateAddEvent) ++ deleteEvent)
-  } yield event
-
-  def eventsGen() : Gen[Seq[Event]] = for {
-    nbEvents <- Gen.choose(0, 100)
-    generations <- generationsGen.map(_.take(nbEvents))
-    startEvent <- referenceGen(Generation.apply(0))
-    events <- foldM(generations, (Seq(startEvent): Seq[Event]))((previousEvents, generation) => eventGen(previousEvents, generation).map(_ +: previousEvents))
-  } yield events.reverse
-
-  def foldM[A, B](fa: LazyList[A], z: B)(f: (B, A) => Gen[B]): Gen[B] = {
-    def step(in: (LazyList[A], B)): Gen[Either[(LazyList[A], B), B]] = {
-      val (s, b) = in
-      if (s.isEmpty)
-        Gen.const(Right(b))
-      else {
-        f (b, s.head).map { bnext =>
-          Left((s.tail, bnext))
-        }
-      }
-    }
-
-    Gen.tailRecM((fa, z))(step)
-  }
-}
-
-class GCPropertiesTest extends AnyFunSuite {
-  test("print sample") {
-    Generators.eventsGen().sample.foreach(_.foreach(println))
-  }
-}
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala
new file mode 100644
index 0000000..ad90f1c
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala
@@ -0,0 +1,228 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.server.blob.deduplication
+
+import java.nio.charset.StandardCharsets
+
+import com.google.common.hash
+import org.apache.james.blob.api.BlobId
+import org.apache.james.server.blob.deduplication.Generators.{OnePassGCTestParameters, TestParameters}
+import org.scalacheck.Prop.forAll
+import org.scalacheck.Test.Parameters
+import org.scalacheck.{Arbitrary, Gen, Properties, Shrink}
+
+case class GenerationAwareBlobId(generation: Generation, hash: String) extends BlobId {
+  override def asString(): String = s"${generation}_$hash"
+}
+
+object Generators {
+
+  // generate a sequence of Generations with monotonic numeric ids
+  // 80% of the time, the generation id is not incremented
+  // 19% of the time, the generation id is incremented by 1
+  // 1% of the time, the generation id is incremented by 2
+  // i.e. (0, 0, 0, 2, 3, 3, 4, 5, 5, 5, 5)
+  def nextGenerationsGen(previousGeneration: Generation): Gen[Generation] =
+    Gen.frequency((80, Gen.const(0l)), (19, Gen.const(1l)), (1, Gen.const(2l))).map(previousGeneration.next)
+
+  val externalIDGen: Gen[ExternalID] = Gen.uuid.map(uuid => ExternalID(uuid.toString))
+
+  def referenceGen(generation: Generation, hash: String): Gen[Reference] = for {
+    externalId <- externalIDGen
+  } yield Reference(externalId, GenerationAwareBlobId(generation, hash), generation)
+
+  case class ReferenceAccumulator(dereferenced: Set[Reference], existing: Set[Reference]) {
+    def addDeletion(dereference: Reference) = this.copy(dereferenced = dereferenced ++ Set(dereference))
+    def addExisting(reference: Reference) = if (dereferenced.contains(reference)) {
+      this
+    } else {
+      this.copy(existing = existing ++ Set(reference))
+    }
+  }
+
+  object ReferenceAccumulator {
+    val empty: ReferenceAccumulator = ReferenceAccumulator(Set.empty, Set.empty)
+  }
+
+  def existingReferences: Seq[Event] => Set[Reference] =
+    _.foldLeft(ReferenceAccumulator.empty)((acc, event) => event match {
+      case dereference: Dereference => acc.addDeletion(dereference.reference)
+      case reference: Reference => acc.addExisting(reference)
+    }).existing
+
+  def dereferenceGen(previousEvents: Seq[Event], generation: Generation): Gen[Option[Dereference]] = {
+    val remainingReferences: Set[Reference] = existingReferences(previousEvents)
+    if (remainingReferences.isEmpty) {
+      Gen.const(None)
+    } else {
+      Gen.oneOf(remainingReferences)
+        .map(reference => Dereference(generation, reference))
+        .map(Some(_))
+    }
+  }
+
+  val hashGenerator: Gen[String] = Gen.alphaLowerStr.map(content => hash.Hashing.sha256().hashString(content, StandardCharsets.UTF_8).toString)
+
+  // Generate an Event, either a Reference or a Dereference (10% of the time if there are previous Events)
+  def eventGen(previousEvents: Seq[Event], contentHashes: Seq[String]): Gen[Event] =
+    if (previousEvents.isEmpty) {
+      for {
+        hashForEvent <- Gen.oneOf(contentHashes)
+        firstEvent <- referenceGen(Generation.first, hashForEvent)
+      } yield firstEvent
+    } else {
+      def pickEvent(newReferenceEvent: Reference, dereferenceEventOption: Option[Dereference]): Gen[Event] = dereferenceEventOption match {
+        case Some(dereferenceEvent) => Gen.frequency((90, newReferenceEvent), (10, dereferenceEvent))
+        case None => Gen.const(newReferenceEvent)
+      }
+
+      for {
+        generation <- nextGenerationsGen(previousEvents.head.generation)
+        contentHashForEvent <- Gen.oneOf(contentHashes)
+
+        newReferenceEvent <- referenceGen(generation, contentHashForEvent)
+        dereferenceEvent <- dereferenceGen(previousEvents, generation)
+
+        event <- pickEvent(newReferenceEvent, dereferenceEvent)
+    } yield event
+  }
+
+  // Generates a list of Events with a ratio of hashes per event to enforce referencing the same hashes multiple times.
+  def eventsGen(maxNbEvents: Int, hashesPerEventsRatio: Float): Gen[Seq[Event]] =  for {
+    hashes <- generateHashes(maxNbEvents, hashesPerEventsRatio)
+    // Generate iteratively events until the number of events is reached
+    events <- Gen.tailRecM(Seq[Event]())(previousEvents => {
+      previousEvents.size match {
+        case nbEvents if nbEvents >= maxNbEvents => Gen.const(Right(previousEvents))
+        case _ => eventGen(previousEvents, hashes).map(event => Left(event +: previousEvents))
+      }
+    })
+  } yield events.reverse
+
+  def generateHashes(maxNbEvents: Int, hashesPerEventsRatio: Float): Gen[Seq[String]] = {
+    val nbHashes = Math.ceil(maxNbEvents * hashesPerEventsRatio).intValue
+    for {
+      contentHashes <- Gen.listOfN(nbHashes, hashGenerator)
+    } yield contentHashes
+  }
+
+  case class TestParameters(events: Seq[Event], generationsToCollect: Seq[Generation])
+  case class OnePassGCTestParameters(events: Seq[Event], generationToCollect: Generation)
+
+  def testParametersGen(eventsGen: Gen[Seq[Event]]): Gen[TestParameters] = for {
+    events <- eventsGen
+    allGenerations = Generation.range(Generation.first, Events.getLastGeneration(events))
+    generationsToCollect <- Gen.someOf(allGenerations)
+  } yield TestParameters(events, generationsToCollect.sorted.toSeq)
+
+  def onePassTestParametersGen(eventsGen: Gen[Seq[Event]]): Gen[OnePassGCTestParameters] = for {
+    events <- eventsGen
+    generation <- Gen.oneOf(if(events.isEmpty) Seq(Generation.first) else events.map(_.generation).toSet)
+  } yield OnePassGCTestParameters(events, generation)
+}
+
+object GCPropertiesTest extends Properties("GC") {
+  val maxNbEvents = 100
+  val hashesPerEventsRatio = 0.2f
+
+  // Arbitrary machinery to effective shrinking
+  val arbEvents: Arbitrary[Seq[Event]] = Arbitrary(Gen.choose(0, maxNbEvents).flatMap(Generators.eventsGen(_, hashesPerEventsRatio)))
+  implicit val arbTestParameters: Arbitrary[Generators.TestParameters] = Arbitrary(Generators.testParametersGen(arbEvents.arbitrary))
+  implicit val arbTestParameter: Arbitrary[Generators.OnePassGCTestParameters] = Arbitrary(Generators.onePassTestParametersGen(arbEvents.arbitrary))
+  import org.scalacheck.Shrink._
+
+  override def overrideParameters(p: Parameters) =
+    p.withMinSuccessfulTests(1000)
+
+  def createSaneTestParameters(events: Seq[Event], generations: Seq[Generation]): TestParameters = {
+    val allGenerations = Generation.range(Generation.first, Events.getLastGeneration(events))
+    TestParameters(events, generations.filter(allGenerations.contains(_)))
+  }
+  def createSaneOnePassGCTestParameters(events: Seq[Event], generation: Generation): OnePassGCTestParameters = {
+    OnePassGCTestParameters(events, generation)
+  }
+
+  implicit val shrinkTestParameters: Shrink[Generators.TestParameters] = Shrink {
+    params: Generators.TestParameters =>
+      shrink(params.events).flatMap(events => shrink(params.generationsToCollect).map(generations => createSaneTestParameters(events, generations)))
+  }
+
+  implicit val shrinkOnePassGCTestParameters: Shrink[Generators.OnePassGCTestParameters] = Shrink {
+    params: Generators.OnePassGCTestParameters =>
+      shrink(params.events).map(events => createSaneOnePassGCTestParameters(events, params.generationToCollect))
+  }
+
+  property("2.1. GC should not delete data being referenced by a pending process or still referenced") = forAll {
+    testParameters: Generators.TestParameters => {
+
+      val partitionedBlobsId = partitionBlobs(testParameters.events)
+      testParameters.generationsToCollect.foldLeft(true)((acc, e) => {
+        val plannedDeletions = GC.plan(Interpreter(testParameters.events).stabilize(), Iteration.initial, e).blobsToDelete.map(_._2)
+        acc && partitionedBlobsId.stillReferencedBlobIds.intersect(plannedDeletions).isEmpty
+      })
+    }
+  }
+
+  property("3.2. less than 10% of unreferenced data of a significant dataset should persist") = forAll {
+    testParameters: Generators.OnePassGCTestParameters => {
+      if (testParameters.generationToCollect >= Events.getLastGeneration(testParameters.events).previous(GC.temporization))
+        true
+      else {
+        val plan = GC.plan(Interpreter(testParameters.events).stabilize(), Iteration.initial, testParameters.generationToCollect)
+        // An Event belongs to a collected Generation
+        val relevantEvents: Event => Boolean = event => event.generation <= testParameters.generationToCollect.previous(GC.temporization)
+        val plannedDeletions = plan.blobsToDelete.map(_._2)
+
+        val partitionedBlobsId = partitionBlobs(testParameters.events.filter(relevantEvents))
+        plannedDeletions.size >= partitionedBlobsId.notReferencedBlobIds.size * 0.9
+      }
+    }
+  }
+
+  /*
+  Implement an oracle that implements BlobStore with a Ref Count reference tracking
+   */
+  def partitionBlobs(events: Seq[Event]): PartitionedEvents = {
+    val (referencingEvents, dereferencingEvents) = events.partition {
+      case _: Reference => true
+      case _: Dereference => false
+    }
+
+    val referencedBlobsCount = referencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
+    val dereferencedBlobsCount = dereferencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
+
+    val stillReferencedBlobIds = referencedBlobsCount.foldLeft(Set[BlobId]())((acc, kv) => {
+      val (blobId, referencesCount) = kv
+      val dereferencesCount  = dereferencedBlobsCount.getOrElse(blobId, 0)
+
+      if(referencesCount > dereferencesCount)
+        acc + blobId
+      else
+        acc
+    })
+
+    lazy val notReferencedBlobIds = dereferencedBlobsCount.keySet -- stillReferencedBlobIds
+    PartitionedEvents(stillReferencedBlobIds, notReferencedBlobIds)
+  }
+
+  case class PartitionedEvents(stillReferencedBlobIds: Set[BlobId], notReferencedBlobIds: Set[BlobId])
+
+}
+
+
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala
new file mode 100644
index 0000000..119d0cf
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala
@@ -0,0 +1,46 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication
+
+/**
+ * Used to iteratively build a StabilizedState
+ */
+case class State(references: Map[Generation, Seq[Reference]], deletions: Map[Generation, Seq[Dereference]]) {
+
+  def stabilize(): StabilizedState = StabilizedState(references, deletions)
+
+  def apply(event: Event): State = event match {
+    case e: Reference => copy(references = addElement(references, e))
+    case e: Dereference => copy(deletions = addElement(deletions, e))
+  }
+
+  private def addElement[T <: Event](collection: Map[Generation, Seq[T]], e: T): Map[Generation, Seq[T]] = {
+    collection.updatedWith(e.generation)(previous => Some(e +: previous.getOrElse(Seq())))
+  }
+}
+
+object State {
+  val initial: State = State(references = Map.empty, deletions = Map.empty)
+}
+
+object Interpreter {
+  def apply(events: Seq[Event]): State =
+    events.foldLeft(State.initial)((state, event) => state(event))
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org