You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2020/06/11 15:25:39 UTC
[james-project] 02/17: JAMES-3150 Add the first garbadge collection
properties
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 58a8c255feb90b7ead395108583e9f70ffde3c92
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Fri Mar 20 16:25:12 2020 +0100
JAMES-3150 Add the first garbadge collection properties
---
pom.xml | 10 +
.../blob/blob-deduplicating/doc/gc-properties.adoc | 8 +-
server/blob/blob-deduplicating/pom.xml | 18 +-
.../james/server/blob/deduplication/GC.scala | 187 +++++++++++++++++
.../src/test/scala/GCPropertiesTest.scala | 107 ----------
.../blob/deduplication/GCPropertiesTest.scala | 228 +++++++++++++++++++++
.../james/server/blob/deduplication/State.scala | 46 +++++
7 files changed, 483 insertions(+), 121 deletions(-)
diff --git a/pom.xml b/pom.xml
index bea0a41..4c64989 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2581,6 +2581,16 @@
<version>0.9.0</version>
</dependency>
<dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.base}</artifactId>
+ <version>1.14.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.base}</artifactId>
+ <version>3.1.1</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
diff --git a/gc-properties.adoc b/server/blob/blob-deduplicating/doc/gc-properties.adoc
similarity index 74%
rename from gc-properties.adoc
rename to server/blob/blob-deduplicating/doc/gc-properties.adoc
index 7c69c01..6ffed72 100644
--- a/gc-properties.adoc
+++ b/server/blob/blob-deduplicating/doc/gc-properties.adoc
@@ -1,10 +1,10 @@
= GC properties
1. the execution time of the GC should be linked to
-active dataset but not to global dataset
-(for scalability purpose)
+active dataset (ie. where the number of references have changed)
+but not to global dataset (for scalability purpose)
-2. GC should run on live dataset
+2. GC should run on active dataset
2.1. GC should not delete data being referenced by a pending process or
still referenced
@@ -17,7 +17,7 @@ not have a different outcome than a single one
3.1. an unreferenced piece of data should be removed after 1 day
3.2. less than 10% of unreferenced data of a significant dataset
-should persist after three GC executions
+should persist
4. GC should report what it does
diff --git a/server/blob/blob-deduplicating/pom.xml b/server/blob/blob-deduplicating/pom.xml
index e849535..ada4371 100644
--- a/server/blob/blob-deduplicating/pom.xml
+++ b/server/blob/blob-deduplicating/pom.xml
@@ -23,7 +23,7 @@
<parent>
<artifactId>james-server-blob</artifactId>
<groupId>org.apache.james</groupId>
- <version>3.5.0-SNAPSHOT</version>
+ <version>3.6.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -32,7 +32,7 @@
<name>Apache James :: Server :: Blob :: Deduplicating Blob Storage</name>
<description>
- An implementation of BlobStore which deduplicate the stored blobs and use a garbage collector
+ An implementation of BlobStore which deduplicates the stored blobs and uses a garbage collector
to ensure their effective deletion.
</description>
@@ -75,21 +75,19 @@
<artifactId>scala-java8-compat_${scala.base}</artifactId>
</dependency>
<dependency>
- <groupId>org.scalactic</groupId>
- <artifactId>scalactic_2.13</artifactId>
- <version>3.1.1</version>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.base}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.13</artifactId>
+ <groupId>org.scalactic</groupId>
+ <artifactId>scalactic_${scala.base}</artifactId>
<version>3.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.13</artifactId>
- <version>1.14.3</version>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.base}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala
new file mode 100644
index 0000000..0fa4ea8
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala
@@ -0,0 +1,187 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.server.blob.deduplication
+
+import org.apache.james.blob.api.BlobId
+
+/**
+ * Isolating and grouping Events
+ */
+sealed abstract class Generation extends Comparable[Generation] {
+ def previous: Generation
+ def previous(times: Long): Generation =
+ (0L until times).foldLeft(this)((generation, _) => generation.previous)
+
+ def next: Generation
+ def next(times: Long): Generation =
+ (0L until times).foldLeft(this)((generation, _) => generation.next)
+
+ /**
+ * List all generations the GC is able to collect
+ */
+ def collectibles(targetedGeneration: Generation): Set[Generation] =
+ Generation.range(this, targetedGeneration.previous(GC.temporization)).toSet
+
+ def <(that: Generation): Boolean = compareTo(that) < 0
+ def <=(that: Generation): Boolean = compareTo(that) <= 0
+ def >(that: Generation): Boolean = compareTo(that) > 0
+ def >=(that: Generation): Boolean = compareTo(that) >= 0
+}
+
+object Generation {
+ val first: Generation = apply(0)
+
+ def apply(id: Long): Generation = {
+ if (id < 0) {
+ NonExistingGeneration
+ } else {
+ ValidGeneration(id)
+ }
+ }
+
+ def range(start: Generation, end: Generation): Seq[Generation] = (start, end) match {
+ case (NonExistingGeneration, NonExistingGeneration) => Seq(NonExistingGeneration)
+ case (ValidGeneration(_), NonExistingGeneration) => Nil
+ case (NonExistingGeneration, ValidGeneration(id)) => NonExistingGeneration +: (0L to id).map(Generation.apply)
+ case (ValidGeneration(id1), ValidGeneration(id2)) => (id1 to id2).map(Generation.apply)
+ }
+}
+
+/**
+ * Generation which has existed
+ */
+case class ValidGeneration(id: Long) extends Generation {
+ override def previous: Generation = Generation(id - 1)
+
+ override def next: Generation = copy(id + 1)
+
+ override def compareTo(t: Generation): Int = t match {
+ case NonExistingGeneration => 1
+ case that: ValidGeneration => id.compareTo(that.id)
+ }
+
+}
+
+/**
+ * NullObject for the initialisation of the GC
+ */
+case object NonExistingGeneration extends Generation {
+ override def previous: Generation = NonExistingGeneration
+
+ override def next: Generation = Generation.first
+
+ override def compareTo(t: Generation): Int = t match {
+ case NonExistingGeneration => 0
+ case _: ValidGeneration => -1
+ }
+}
+
+/**
+ * A run of the GC regarding a Set of Generations
+ */
+case class Iteration(id: Long, processedGenerations: Set[Generation], lastGeneration: Generation) {
+ def next(generations: Set[Generation], lastGeneration: Generation): Iteration = Iteration(id + 1, generations, lastGeneration)
+}
+
+object Iteration {
+ def initial: Iteration = Iteration(0, Set(), NonExistingGeneration)
+}
+
+case class ExternalID(id: String)
+
+/**
+ * Modelized users' interactions related to blobs
+ */
+sealed trait Event {
+ def blob: BlobId
+ def externalId: ExternalID
+ def generation: Generation
+}
+
+case class Reference(externalId: ExternalID, blobId: BlobId, generation: Generation) extends Event {
+ override def blob: BlobId = blobId
+}
+
+case class Dereference(generation: Generation, reference: Reference) extends Event {
+ override def blob: BlobId = reference.blob
+ override def externalId: ExternalID = reference.externalId
+}
+
+object Events {
+ def getLastGeneration(events: Seq[Event]): Generation = events.map(_.generation).maxOption
+ .getOrElse(Generation.first)
+
+}
+
+case class Report(iteration: Iteration, blobsToDelete: Set[(Generation, BlobId)])
+
+/**
+ * Accessors to the References/Dereferences made by generations
+ */
+case class StabilizedState(references: Map[Generation, Seq[Reference]], dereferences: Map[Generation, Seq[Dereference]]) {
+ private val referencedBlobsAcrossGenerations: Map[Generation, ReferencedBlobs] = {
+ val blobIds = references.keys ++ dereferences.keys
+ val maxGeneration = blobIds.maxOption.getOrElse(Generation.first)
+ val minGeneration = blobIds.minOption.getOrElse(Generation.first)
+
+ val initialRefs = Generation.range(NonExistingGeneration, minGeneration.previous).map((_, ReferencedBlobs(Map()))).toMap
+ Generation.range(minGeneration, maxGeneration)
+ .foldLeft(initialRefs)(buildGeneration)
+ }
+
+ private def buildGeneration(refs: Map[Generation, ReferencedBlobs], generation: Generation): Map[Generation, ReferencedBlobs] = {
+ val populatedRefs = references.getOrElse(generation, Set())
+ .foldLeft(refs(generation.previous))((currentReferences, ref) => currentReferences.addReferences(ref.blobId))
+
+ val expungedRefs = dereferences.getOrElse(generation, Set())
+ .foldLeft(populatedRefs)((currentReferences, ref) => currentReferences.removeReferences(ref.reference.blobId))
+
+ refs + (generation -> expungedRefs)
+ }
+
+ def referencesAt(generation: Generation): ReferencedBlobs = referencedBlobsAcrossGenerations(generation)
+
+ type ReferenceCount = Int
+
+ case class ReferencedBlobs(blobs: Map[BlobId, ReferenceCount]) {
+ def isNotReferenced(blobId: BlobId): Boolean =
+ !blobs.contains(blobId)
+
+ def addReferences(blobId: BlobId): ReferencedBlobs =
+ ReferencedBlobs(blobs.updatedWith(blobId)(oldCount => oldCount.map(count => Some(count + 1)).getOrElse(Some(1))))
+ def removeReferences(blobId: BlobId): ReferencedBlobs =
+ ReferencedBlobs(blobs.updatedWith(blobId)(oldCount => oldCount.map(_ - 1).filter(_ > 0)))
+ }
+
+}
+
+object GC {
+ val temporization: Long = 2
+ def plan(state: StabilizedState, lastIteration: Iteration, targetedGeneration: Generation): Report = {
+ val processedGenerations = lastIteration.lastGeneration.collectibles(targetedGeneration)
+ val blobsToDelete = state.dereferences
+ .filter { case (generation, _) => processedGenerations.contains(generation) }
+ .flatMap { case (_, dereferences) => dereferences }
+ .toSet
+ .filter(dereference => state.referencesAt(processedGenerations.max).isNotReferenced(dereference.reference.blobId))
+ .map(dereference => (dereference.reference.generation, dereference.reference.blobId))
+
+ Report(lastIteration.next(processedGenerations, targetedGeneration.previous(temporization)), blobsToDelete)
+ }
+}
diff --git a/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala b/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala
deleted file mode 100644
index 5de3f44..0000000
--- a/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-import org.apache.james.blob.api.{BlobId, TestBlobId}
-import org.scalacheck.Gen
-import org.scalatest.funsuite.AnyFunSuite
-
-case class Generation(id: Long)
-case class Iteration(id: Long)
-case class ExternalID(id: String) // TODO
-
-sealed trait Event
-case class Reference(externalId: ExternalID, blobId: BlobId, generation: Generation) extends Event
-case class Deletion(generation: Generation, reference: Reference) extends Event
-
-case class Report(iteration: Iteration,
- blobsToDelete: Set[(Generation, BlobId)]
- )
-
-object Generators {
-
- val smallInteger = Gen.choose(0L,100L)
- var current = 0;
- val generationsGen: Gen[LazyList[Generation]] = Gen.infiniteLazyList(Gen.frequency((90, Gen.const(0)), (9, Gen.const(1)), (1, Gen.const(2))))
- .map(list => list.scanLeft(0)(_ + _))
- .map(list => list.map(_.toLong).map(Generation.apply))
-
- val iterationGen = smallInteger.map(Iteration.apply)
-
- val blobIdFactory = new TestBlobId.Factory
-
- def blobIdGen(generation: Generation) : Gen[BlobId] = Gen.uuid.map(uuid =>
- blobIdFactory.from(s"${generation}_$uuid"))
-
- val externalIDGen = Gen.uuid.map(uuid => ExternalID(uuid.toString))
-
- def referenceGen(generation: Generation): Gen[Reference] = for {
- blobId <- blobIdGen(generation)
- externalId <- externalIDGen
- } yield Reference(externalId, blobId, generation)
-
- def existingReferences : Seq[Event] => Set[Reference] = _
- .foldLeft((Set[Reference](), Set[Reference]()))((acc, event) => event match {
- case deletion: Deletion => (acc._1 ++ Set(deletion.reference), acc._2)
- case reference: Reference => if (acc._1.contains(reference)) {
- acc
- } else {
- (acc._1, acc._2 ++ Set(reference))
- }
- })._2
-
- def deletionGen(previousEvents : Seq[Event], generation: Generation): Gen[Option[Deletion]] = {
- val persistingReferences = existingReferences(previousEvents)
- if (persistingReferences.isEmpty) {
- Gen.const(None)
- } else {
- Gen.oneOf(persistingReferences)
- .map(reference => Deletion(generation, reference))
- .map(Some(_))
- }
- }
-
- def duplicateReferenceGen(generation: Generation, reference: Reference): Gen[Reference] = {
- if (reference.generation == generation) {
- externalIDGen.map(id => reference.copy(externalId = id))
- } else {
- referenceGen(generation)
- }
- }
-
- def eventGen(previousEvents: Seq[Event], generation: Generation): Gen[Event] = for {
- greenAddEvent <- referenceGen(generation)
- addEvents = previousEvents.flatMap {
- case x: Reference => Some(x)
- case _ => None
- }
- randomAddEvent <- Gen.oneOf(addEvents)
- duplicateAddEvent <- duplicateReferenceGen(generation, randomAddEvent)
- deleteEvent <- deletionGen(previousEvents, generation)
- event <- Gen.oneOf(Seq(greenAddEvent, duplicateAddEvent) ++ deleteEvent)
- } yield event
-
- def eventsGen() : Gen[Seq[Event]] = for {
- nbEvents <- Gen.choose(0, 100)
- generations <- generationsGen.map(_.take(nbEvents))
- startEvent <- referenceGen(Generation.apply(0))
- events <- foldM(generations, (Seq(startEvent): Seq[Event]))((previousEvents, generation) => eventGen(previousEvents, generation).map(_ +: previousEvents))
- } yield events.reverse
-
- def foldM[A, B](fa: LazyList[A], z: B)(f: (B, A) => Gen[B]): Gen[B] = {
- def step(in: (LazyList[A], B)): Gen[Either[(LazyList[A], B), B]] = {
- val (s, b) = in
- if (s.isEmpty)
- Gen.const(Right(b))
- else {
- f (b, s.head).map { bnext =>
- Left((s.tail, bnext))
- }
- }
- }
-
- Gen.tailRecM((fa, z))(step)
- }
-}
-
-class GCPropertiesTest extends AnyFunSuite {
- test("print sample") {
- Generators.eventsGen().sample.foreach(_.foreach(println))
- }
-}
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala
new file mode 100644
index 0000000..ad90f1c
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala
@@ -0,0 +1,228 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.server.blob.deduplication
+
+import java.nio.charset.StandardCharsets
+
+import com.google.common.hash
+import org.apache.james.blob.api.BlobId
+import org.apache.james.server.blob.deduplication.Generators.{OnePassGCTestParameters, TestParameters}
+import org.scalacheck.Prop.forAll
+import org.scalacheck.Test.Parameters
+import org.scalacheck.{Arbitrary, Gen, Properties, Shrink}
+
+case class GenerationAwareBlobId(generation: Generation, hash: String) extends BlobId {
+ override def asString(): String = s"${generation}_$hash"
+}
+
+object Generators {
+
+ // generate a sequence of Generations with monotonic numeric ids
+ // 80% of the time, the generation id is not incremented
+ // 19% of the time, the generation id is incremented by 1
+ // 1% of the time, the generation id is incremented by 2
+ // i.e. (0, 0, 0, 2, 3, 3, 4, 5, 5, 5, 5)
+ def nextGenerationsGen(previousGeneration: Generation): Gen[Generation] =
+ Gen.frequency((80, Gen.const(0l)), (19, Gen.const(1l)), (1, Gen.const(2l))).map(previousGeneration.next)
+
+ val externalIDGen: Gen[ExternalID] = Gen.uuid.map(uuid => ExternalID(uuid.toString))
+
+ def referenceGen(generation: Generation, hash: String): Gen[Reference] = for {
+ externalId <- externalIDGen
+ } yield Reference(externalId, GenerationAwareBlobId(generation, hash), generation)
+
+ case class ReferenceAccumulator(dereferenced: Set[Reference], existing: Set[Reference]) {
+ def addDeletion(dereference: Reference) = this.copy(dereferenced = dereferenced ++ Set(dereference))
+ def addExisting(reference: Reference) = if (dereferenced.contains(reference)) {
+ this
+ } else {
+ this.copy(existing = existing ++ Set(reference))
+ }
+ }
+
+ object ReferenceAccumulator {
+ val empty: ReferenceAccumulator = ReferenceAccumulator(Set.empty, Set.empty)
+ }
+
+ def existingReferences: Seq[Event] => Set[Reference] =
+ _.foldLeft(ReferenceAccumulator.empty)((acc, event) => event match {
+ case dereference: Dereference => acc.addDeletion(dereference.reference)
+ case reference: Reference => acc.addExisting(reference)
+ }).existing
+
+ def dereferenceGen(previousEvents: Seq[Event], generation: Generation): Gen[Option[Dereference]] = {
+ val remainingReferences: Set[Reference] = existingReferences(previousEvents)
+ if (remainingReferences.isEmpty) {
+ Gen.const(None)
+ } else {
+ Gen.oneOf(remainingReferences)
+ .map(reference => Dereference(generation, reference))
+ .map(Some(_))
+ }
+ }
+
+ val hashGenerator: Gen[String] = Gen.alphaLowerStr.map(content => hash.Hashing.sha256().hashString(content, StandardCharsets.UTF_8).toString)
+
+ // Generate an Event, either a Reference or a Dereference (10% of the time if there are previous Events)
+ def eventGen(previousEvents: Seq[Event], contentHashes: Seq[String]): Gen[Event] =
+ if (previousEvents.isEmpty) {
+ for {
+ hashForEvent <- Gen.oneOf(contentHashes)
+ firstEvent <- referenceGen(Generation.first, hashForEvent)
+ } yield firstEvent
+ } else {
+ def pickEvent(newReferenceEvent: Reference, dereferenceEventOption: Option[Dereference]): Gen[Event] = dereferenceEventOption match {
+ case Some(dereferenceEvent) => Gen.frequency((90, newReferenceEvent), (10, dereferenceEvent))
+ case None => Gen.const(newReferenceEvent)
+ }
+
+ for {
+ generation <- nextGenerationsGen(previousEvents.head.generation)
+ contentHashForEvent <- Gen.oneOf(contentHashes)
+
+ newReferenceEvent <- referenceGen(generation, contentHashForEvent)
+ dereferenceEvent <- dereferenceGen(previousEvents, generation)
+
+ event <- pickEvent(newReferenceEvent, dereferenceEvent)
+ } yield event
+ }
+
+ // Generates a list of Events with a ratio of hashes per event to enforce referencing the same hashes multiple times.
+ def eventsGen(maxNbEvents: Int, hashesPerEventsRatio: Float): Gen[Seq[Event]] = for {
+ hashes <- generateHashes(maxNbEvents, hashesPerEventsRatio)
+ // Generate iteratively events until the number of events is reached
+ events <- Gen.tailRecM(Seq[Event]())(previousEvents => {
+ previousEvents.size match {
+ case nbEvents if nbEvents >= maxNbEvents => Gen.const(Right(previousEvents))
+ case _ => eventGen(previousEvents, hashes).map(event => Left(event +: previousEvents))
+ }
+ })
+ } yield events.reverse
+
+ def generateHashes(maxNbEvents: Int, hashesPerEventsRatio: Float): Gen[Seq[String]] = {
+ val nbHashes = Math.ceil(maxNbEvents * hashesPerEventsRatio).intValue
+ for {
+ contentHashes <- Gen.listOfN(nbHashes, hashGenerator)
+ } yield contentHashes
+ }
+
+ case class TestParameters(events: Seq[Event], generationsToCollect: Seq[Generation])
+ case class OnePassGCTestParameters(events: Seq[Event], generationToCollect: Generation)
+
+ def testParametersGen(eventsGen: Gen[Seq[Event]]): Gen[TestParameters] = for {
+ events <- eventsGen
+ allGenerations = Generation.range(Generation.first, Events.getLastGeneration(events))
+ generationsToCollect <- Gen.someOf(allGenerations)
+ } yield TestParameters(events, generationsToCollect.sorted.toSeq)
+
+ def onePassTestParametersGen(eventsGen: Gen[Seq[Event]]): Gen[OnePassGCTestParameters] = for {
+ events <- eventsGen
+ generation <- Gen.oneOf(if(events.isEmpty) Seq(Generation.first) else events.map(_.generation).toSet)
+ } yield OnePassGCTestParameters(events, generation)
+}
+
+object GCPropertiesTest extends Properties("GC") {
+ val maxNbEvents = 100
+ val hashesPerEventsRatio = 0.2f
+
+ // Arbitrary machinery to effective shrinking
+ val arbEvents: Arbitrary[Seq[Event]] = Arbitrary(Gen.choose(0, maxNbEvents).flatMap(Generators.eventsGen(_, hashesPerEventsRatio)))
+ implicit val arbTestParameters: Arbitrary[Generators.TestParameters] = Arbitrary(Generators.testParametersGen(arbEvents.arbitrary))
+ implicit val arbTestParameter: Arbitrary[Generators.OnePassGCTestParameters] = Arbitrary(Generators.onePassTestParametersGen(arbEvents.arbitrary))
+ import org.scalacheck.Shrink._
+
+ override def overrideParameters(p: Parameters) =
+ p.withMinSuccessfulTests(1000)
+
+ def createSaneTestParameters(events: Seq[Event], generations: Seq[Generation]): TestParameters = {
+ val allGenerations = Generation.range(Generation.first, Events.getLastGeneration(events))
+ TestParameters(events, generations.filter(allGenerations.contains(_)))
+ }
+ def createSaneOnePassGCTestParameters(events: Seq[Event], generation: Generation): OnePassGCTestParameters = {
+ OnePassGCTestParameters(events, generation)
+ }
+
+ implicit val shrinkTestParameters: Shrink[Generators.TestParameters] = Shrink {
+ params: Generators.TestParameters =>
+ shrink(params.events).flatMap(events => shrink(params.generationsToCollect).map(generations => createSaneTestParameters(events, generations)))
+ }
+
+ implicit val shrinkOnePassGCTestParameters: Shrink[Generators.OnePassGCTestParameters] = Shrink {
+ params: Generators.OnePassGCTestParameters =>
+ shrink(params.events).map(events => createSaneOnePassGCTestParameters(events, params.generationToCollect))
+ }
+
+ property("2.1. GC should not delete data being referenced by a pending process or still referenced") = forAll {
+ testParameters: Generators.TestParameters => {
+
+ val partitionedBlobsId = partitionBlobs(testParameters.events)
+ testParameters.generationsToCollect.foldLeft(true)((acc, e) => {
+ val plannedDeletions = GC.plan(Interpreter(testParameters.events).stabilize(), Iteration.initial, e).blobsToDelete.map(_._2)
+ acc && partitionedBlobsId.stillReferencedBlobIds.intersect(plannedDeletions).isEmpty
+ })
+ }
+ }
+
+ property("3.2. less than 10% of unreferenced data of a significant dataset should persist") = forAll {
+ testParameters: Generators.OnePassGCTestParameters => {
+ if (testParameters.generationToCollect >= Events.getLastGeneration(testParameters.events).previous(GC.temporization))
+ true
+ else {
+ val plan = GC.plan(Interpreter(testParameters.events).stabilize(), Iteration.initial, testParameters.generationToCollect)
+ // An Event belongs to a collected Generation
+ val relevantEvents: Event => Boolean = event => event.generation <= testParameters.generationToCollect.previous(GC.temporization)
+ val plannedDeletions = plan.blobsToDelete.map(_._2)
+
+ val partitionedBlobsId = partitionBlobs(testParameters.events.filter(relevantEvents))
+ plannedDeletions.size >= partitionedBlobsId.notReferencedBlobIds.size * 0.9
+ }
+ }
+ }
+
+ /*
+ Implement an oracle that implements BlobStore with a Ref Count reference tracking
+ */
+ def partitionBlobs(events: Seq[Event]): PartitionedEvents = {
+ val (referencingEvents, dereferencingEvents) = events.partition {
+ case _: Reference => true
+ case _: Dereference => false
+ }
+
+ val referencedBlobsCount = referencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
+ val dereferencedBlobsCount = dereferencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap
+
+ val stillReferencedBlobIds = referencedBlobsCount.foldLeft(Set[BlobId]())((acc, kv) => {
+ val (blobId, referencesCount) = kv
+ val dereferencesCount = dereferencedBlobsCount.getOrElse(blobId, 0)
+
+ if(referencesCount > dereferencesCount)
+ acc + blobId
+ else
+ acc
+ })
+
+ lazy val notReferencedBlobIds = dereferencedBlobsCount.keySet -- stillReferencedBlobIds
+ PartitionedEvents(stillReferencedBlobIds, notReferencedBlobIds)
+ }
+
+ case class PartitionedEvents(stillReferencedBlobIds: Set[BlobId], notReferencedBlobIds: Set[BlobId])
+
+}
+
+
diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala
new file mode 100644
index 0000000..119d0cf
--- /dev/null
+++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala
@@ -0,0 +1,46 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.server.blob.deduplication
+
+/**
+ * Used to iteratively build a StabilizedState
+ */
+case class State(references: Map[Generation, Seq[Reference]], deletions: Map[Generation, Seq[Dereference]]) {
+
+ def stabilize(): StabilizedState = StabilizedState(references, deletions)
+
+ def apply(event: Event): State = event match {
+ case e: Reference => copy(references = addElement(references, e))
+ case e: Dereference => copy(deletions = addElement(deletions, e))
+ }
+
+ private def addElement[T <: Event](collection: Map[Generation, Seq[T]], e: T): Map[Generation, Seq[T]] = {
+ collection.updatedWith(e.generation)(previous => Some(e +: previous.getOrElse(Seq())))
+ }
+}
+
+object State {
+ val initial: State = State(references = Map.empty, deletions = Map.empty)
+}
+
+object Interpreter {
+ def apply(events: Seq[Event]): State =
+ events.foldLeft(State.initial)((state, event) => state(event))
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org