You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/11/15 19:38:19 UTC
[incubator-pekko-persistence-cassandra] 03/07: format source with scalafmt, #2
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git
commit 9b6b9f1850463f8061545cccc869b848154344dd
Author: Auto Format <nobody>
AuthorDate: Sat Nov 12 07:40:57 2022 +0100
format source with scalafmt, #2
---
build.sbt | 76 ++---
.../cassandra/testkit/CassandraLauncher.scala | 23 +-
.../cassandra/EventsByTagMigration.scala | 1 -
.../cassandra/KeyspaceAndTableStatements.scala | 1 -
.../persistence/cassandra/cleanup/Cleanup.scala | 1 -
.../compaction/BaseCompactionStrategy.scala | 3 +-
.../compaction/LeveledCompactionStrategy.scala | 3 +-
.../compaction/SizeTieredCompactionStrategy.scala | 2 +-
.../persistence/cassandra/journal/Buffer.scala | 4 +-
.../cassandra/journal/CassandraJournal.scala | 17 +-
.../journal/CassandraJournalStatements.scala | 4 +-
.../cassandra/journal/CassandraTagRecovery.scala | 17 +-
.../cassandra/journal/PubSubThrottler.scala | 2 +-
.../persistence/cassandra/journal/TagWriter.scala | 2 +-
.../persistence/cassandra/journal/TagWriters.scala | 7 +-
.../query/EventsByPersistenceIdStage.scala | 1 -
.../cassandra/query/EventsByTagStage.scala | 25 +-
.../query/TagViewSequenceNumberScanner.scala | 4 +-
.../query/javadsl/CassandraReadJournal.scala | 2 -
.../akka/persistence/cassandra/query/package.scala | 8 +-
.../query/scaladsl/CassandraReadJournal.scala | 5 +-
.../snapshot/CassandraSnapshotStore.scala | 12 +-
.../cassandra/MultiNodeClusterSpec.scala | 87 +++--
.../cassandra/CassandraEventsByTagLoadSpec.scala | 21 +-
.../akka/persistence/cassandra/CassandraSpec.scala | 6 +-
.../cassandra/EventsByTagMigrationSpec.scala | 32 +-
.../akka/persistence/cassandra/RetriesSpec.scala | 6 +-
.../cassandra/cleanup/CleanupSpec.scala | 22 +-
.../CassandraCompactionStrategySpec.scala | 8 +-
.../cassandra/query/EventAdaptersReadSpec.scala | 45 +--
.../cassandra/query/EventsByTagSpec.scala | 365 +++++++++++----------
.../cassandra/testkit/CassandraLauncherSpec.scala | 4 +-
.../AllPersistenceIdsMigrationCompileOnly.scala | 4 +-
.../doc/reconciler/ReconciliationCompileOnly.scala | 4 +-
.../test/scala/doc/cleanup/CleanupDocExample.scala | 4 +-
.../cassandra/example/EventProcessorStream.scala | 34 +-
.../akka/persistence/cassandra/example/Main.scala | 68 ++--
.../persistence/cassandra/example/ReadSide.scala | 15 +-
project/Common.scala | 46 +--
project/Dependencies.scala | 18 +-
40 files changed, 513 insertions(+), 496 deletions(-)
diff --git a/build.sbt b/build.sbt
index 1e39948..512dd39 100644
--- a/build.sbt
+++ b/build.sbt
@@ -27,7 +27,7 @@ lazy val core = project
name := "akka-persistence-cassandra",
libraryDependencies ++= Dependencies.akkaPersistenceCassandraDependencies,
Compile / packageBin / packageOptions += Package.ManifestAttributes(
- "Automatic-Module-Name" -> "akka.persistence.cassandra"))
+ "Automatic-Module-Name" -> "akka.persistence.cassandra"))
.configs(MultiJvm)
lazy val cassandraLauncher = project
@@ -48,7 +48,7 @@ lazy val cassandraBundle = project
crossPaths := false,
autoScalaLibrary := false,
libraryDependencies += ("org.apache.cassandra" % "cassandra-all" % "3.11.3")
- .exclude("commons-logging", "commons-logging"),
+ .exclude("commons-logging", "commons-logging"),
dependencyOverrides += "com.github.jbellis" % "jamm" % "0.3.3", // See jamm comment in https://issues.apache.org/jira/browse/CASSANDRA-9608
assembly / target := target.value / "bundle" / "akka" / "persistence" / "cassandra" / "launcher",
assembly / assemblyJarName := "cassandra-bundle.jar")
@@ -69,19 +69,19 @@ lazy val endToEndExample = project
dockerUsername := Some("kubakka"),
dockerUpdateLatest := true,
// update if deploying to some where that can't see docker hu
- //dockerRepository := Some("some-registry"),
+ // dockerRepository := Some("some-registry"),
dockerCommands ++= Seq(
- Cmd("USER", "root"),
- Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", "busybox-extras", "curl", "iptables"),
- Cmd(
- "RUN",
- "/sbin/apk",
- "add",
- "--no-cache",
- "jattach",
- "--repository",
- "http://dl-cdn.alpinelinux.org/alpine/edge/community/"),
- Cmd("RUN", "chgrp -R 0 . && chmod -R g=u .")),
+ Cmd("USER", "root"),
+ Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", "busybox-extras", "curl", "iptables"),
+ Cmd(
+ "RUN",
+ "/sbin/apk",
+ "add",
+ "--no-cache",
+ "jattach",
+ "--repository",
+ "http://dl-cdn.alpinelinux.org/alpine/edge/community/"),
+ Cmd("RUN", "chgrp -R 0 . && chmod -R g=u .")),
// Docker image is only for running in k8s
Universal / javaOptions ++= Seq("-J-Dconfig.resource=kubernetes.conf"))
.enablePlugins(DockerPlugin, JavaAppPackaging)
@@ -104,30 +104,30 @@ lazy val docs = project
Preprocess / sourceDirectory := (LocalRootProject / ScalaUnidoc / unidoc / target).value,
Paradox / siteSubdirName := s"docs/akka-persistence-cassandra/${projectInfoVersion.value}",
Compile / paradoxProperties ++= Map(
- "project.url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current/",
- "canonical.base_url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current",
- "akka.version" -> Dependencies.AkkaVersion,
- // Akka
- "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
- "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
- "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
- // Alpakka
- "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
- "scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
- "javadoc.akka.stream.alpakka.base_url" -> "",
- // APC 0.x
- "extref.apc-0.x.base_url" -> s"https://doc.akka.io/docs/akka-persistence-cassandra/0.103/%s",
- // Cassandra
- "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s",
- // Datastax Java driver
- "extref.java-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.DriverVersionInDocs}/%s",
- "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.DriverVersionInDocs}/",
- // Java
- "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
- // Scala
- "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
- "scaladoc.akka.persistence.cassandra.base_url" -> s"/${(Preprocess / siteSubdirName).value}/",
- "javadoc.akka.persistence.cassandra.base_url" -> ""), // no Javadoc is published
+ "project.url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current/",
+ "canonical.base_url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current",
+ "akka.version" -> Dependencies.AkkaVersion,
+ // Akka
+ "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
+ "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
+ "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
+ // Alpakka
+ "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
+ "scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
+ "javadoc.akka.stream.alpakka.base_url" -> "",
+ // APC 0.x
+ "extref.apc-0.x.base_url" -> s"https://doc.akka.io/docs/akka-persistence-cassandra/0.103/%s",
+ // Cassandra
+ "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s",
+ // Datastax Java driver
+ "extref.java-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.DriverVersionInDocs}/%s",
+ "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.DriverVersionInDocs}/",
+ // Java
+ "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
+ // Scala
+ "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
+ "scaladoc.akka.persistence.cassandra.base_url" -> s"/${(Preprocess / siteSubdirName).value}/",
+ "javadoc.akka.persistence.cassandra.base_url" -> ""), // no Javadoc is published
paradoxGroups := Map("Language" -> Seq("Java", "Scala")),
ApidocPlugin.autoImport.apidocRootPackage := "akka",
resolvers += Resolver.jcenterRepo,
diff --git a/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala b/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
index 5e4fc35..ff82c8c 100644
--- a/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
+++ b/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
@@ -260,7 +260,7 @@ object CassandraLauncher {
println(
s"Starting Cassandra on port client port: $realPort storage port $storagePort host $realHost java version ${System
- .getProperty("java.runtime.version")}")
+ .getProperty("java.runtime.version")}")
// http://wiki.apache.org/cassandra/StorageConfiguration
val conf = readResource(configResource)
@@ -400,16 +400,17 @@ object CassandraLauncher {
val deadline = AwaitListenTimeout.fromNow
@annotation.tailrec
def tryConnect(): Unit = {
- val retry = try {
- new Socket(host, port).close()
- false
- } catch {
- case _: IOException if deadline.hasTimeLeft() =>
- Thread.sleep(AwaitListenPoll.toMillis)
- true
- case ioe: IOException =>
- throw new RuntimeException(s"Cassandra did not start within $AwaitListenTimeout", ioe)
- }
+ val retry =
+ try {
+ new Socket(host, port).close()
+ false
+ } catch {
+ case _: IOException if deadline.hasTimeLeft() =>
+ Thread.sleep(AwaitListenPoll.toMillis)
+ true
+ case ioe: IOException =>
+ throw new RuntimeException(s"Cassandra did not start within $AwaitListenTimeout", ioe)
+ }
if (retry) tryConnect()
}
tryConnect()
diff --git a/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala b/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
index b063d05..390bc3a 100644
--- a/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
@@ -68,7 +68,6 @@ object EventsByTagMigration {
}
/**
- *
* @param pluginConfigPath The config namespace where the plugin is configured, default is `akka.persistence.cassandra`
*/
class EventsByTagMigration(
diff --git a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
index e1394d9..9586da3 100644
--- a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
@@ -31,7 +31,6 @@ class KeyspaceAndTableStatements(
*
* This can be queried in for example a startup script without accessing the actual
* Cassandra plugin actor.
- *
*/
def createJournalTablesStatements: immutable.Seq[String] =
journalStatements.createTable ::
diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
index aeb2618..b61cb09 100644
--- a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
@@ -131,7 +131,6 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
* snapshot.
*
* @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots
- *
*/
def deleteBeforeSnapshot(
persistenceId: String,
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
index 71841bb..7f2239a 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
@@ -16,7 +16,8 @@ abstract class BaseCompactionStrategy(config: Config, className: String, propert
require(config.hasPath("class") && config.getString("class") == className, s"Config does not specify a $className")
require(
config.entrySet().asScala.map(_.getKey).forall(propertyKeys.contains(_)),
- s"Config contains properties not supported by a $className. Supported: $propertyKeys. Supplied: ${config.entrySet().asScala.map(_.getKey)}")
+ s"Config contains properties not supported by a $className. Supported: $propertyKeys. Supplied: ${config.entrySet().asScala.map(
+ _.getKey)}")
val enabled: Boolean =
if (config.hasPath("enabled")) config.getBoolean("enabled") else true
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
index a4be050..db7c05f 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
@@ -10,7 +10,8 @@ import com.typesafe.config.Config
* https://github.com/apache/cassandra/blob/cassandra-2.2/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
*/
class LeveledCompactionStrategy(config: Config)
- extends BaseCompactionStrategy(config, LeveledCompactionStrategy.ClassName, LeveledCompactionStrategy.propertyKeys) {
+ extends BaseCompactionStrategy(config, LeveledCompactionStrategy.ClassName,
+ LeveledCompactionStrategy.propertyKeys) {
val ssTableSizeInMB: Long =
if (config.hasPath("sstable_size_in_mb"))
config.getLong("sstable_size_in_mb")
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
index fb02ff1..d6108de 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
@@ -51,7 +51,7 @@ object SizeTieredCompactionStrategy extends CassandraCompactionStrategyConfig[Si
override def propertyKeys: List[String] =
(BaseCompactionStrategy.propertyKeys ++
- List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted
+ List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted
override def fromConfig(config: Config): SizeTieredCompactionStrategy =
new SizeTieredCompactionStrategy(config)
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
index 0e8d2c8..2c2aa2b 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
@@ -65,8 +65,8 @@ private[akka] case class Buffer(
// add them to pending, any time bucket changes will be detected later
copy(size = newSize, pending = pending :+ write)
} else if (nextBatch.headOption.exists(oldestEvent =>
- UUIDComparator.comparator
- .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
+ UUIDComparator.comparator
+ .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
// rare case where events have been received out of order, just re-build the buffer
require(pending.isEmpty)
val allWrites = (nextBatch :+ write).sortBy(_.events.head._1.timeUuid)(timeUuidOrdering)
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
index 9f6de3b..cb601de 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
@@ -6,7 +6,7 @@ package akka.persistence.cassandra.journal
import java.lang.{ Long => JLong }
import java.nio.ByteBuffer
-import java.util.{ UUID, HashMap => JHMap, Map => JMap }
+import java.util.{ HashMap => JHMap, Map => JMap, UUID }
import akka.Done
import akka.actor.SupervisorStrategy.Stop
@@ -266,7 +266,7 @@ import akka.stream.scaladsl.Source
writeMessages(serialized)
} else {
- //if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
+ // if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
val groups: List[List[SerializedAtomicWrite]] = groupedWrites(serialized.toList.reverse, Nil, Nil)
// execute the groups in sequence
@@ -301,7 +301,7 @@ import akka.stream.scaladsl.Source
toReturn
}
- //Regroup batches by payload size
+ // Regroup batches by payload size
@tailrec
private def groupedWrites(
reversed: List[SerializedAtomicWrite],
@@ -639,12 +639,11 @@ import akka.stream.scaladsl.Source
val boundSelectHighestSequenceNr = preparedSelectHighestSequenceNr.map(_.bind(persistenceId, partitionNr: JLong))
boundSelectHighestSequenceNr
.flatMap(selectOne)
- .map(
- row =>
- row
- .map(s =>
- PartitionInfo(partitionNr, minSequenceNr(partitionNr), math.min(s.getLong("sequence_nr"), maxSequenceNr)))
- .getOrElse(PartitionInfo(partitionNr, minSequenceNr(partitionNr), -1)))
+ .map(row =>
+ row
+ .map(s =>
+ PartitionInfo(partitionNr, minSequenceNr(partitionNr), math.min(s.getLong("sequence_nr"), maxSequenceNr)))
+ .getOrElse(PartitionInfo(partitionNr, minSequenceNr(partitionNr), -1)))
}
private def asyncHighestDeletedSequenceNumber(persistenceId: String): Future[Long] = {
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
index dc693c9..06e3604 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
@@ -76,8 +76,8 @@ import akka.persistence.cassandra.FutureDone
| WITH gc_grace_seconds =${eventsByTagSettings.tagTable.gcGraceSeconds}
| AND compaction = ${indent(eventsByTagSettings.tagTable.compactionStrategy.asCQL, " ")}
| ${if (eventsByTagSettings.tagTable.ttl.isDefined)
- "AND default_time_to_live = " + eventsByTagSettings.tagTable.ttl.get.toSeconds
- else ""}
+ "AND default_time_to_live = " + eventsByTagSettings.tagTable.ttl.get.toSeconds
+ else ""}
""".stripMargin.trim
def createTagsProgressTable: String =
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
index a444f64..6ea4c40 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
@@ -85,15 +85,14 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
else {
val completed: List[Future[Done]] =
tpr.tags.toList
- .map(
- tag =>
- tag -> serializeEvent(
- tpr.pr,
- tpr.tags,
- tpr.offset,
- settings.eventsByTagSettings.bucketSize,
- serialization,
- system))
+ .map(tag =>
+ tag -> serializeEvent(
+ tpr.pr,
+ tpr.tags,
+ tpr.offset,
+ settings.eventsByTagSettings.bucketSize,
+ serialization,
+ system))
.map {
case (tag, serializedFut) =>
serializedFut.map { serialized =>
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala b/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
index 0cc9a39..a947ade 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
@@ -27,7 +27,7 @@ import akka.annotation.InternalApi
def receive = {
case Tick =>
for ((msg, clients) <- repeated;
- client <- clients) {
+ client <- clients) {
delegate.tell(msg, client)
}
seen.clear()
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
index c74630a..93b81f2 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
@@ -344,7 +344,7 @@ import scala.util.{ Failure, Success, Try }
throw new IllegalStateException(
s"Expected events to be ordered by seqNr. ${event.persistenceId} " +
s"Events: ${writes.nextBatch.map(e =>
- (e.events.head._1.persistenceId, e.events.head._1.sequenceNr, e.events.head._1.timeUuid))}")
+ (e.events.head._1.persistenceId, e.events.head._1.sequenceNr, e.events.head._1.timeUuid))}")
acc + (event.persistenceId -> PidProgress(from, event.sequenceNr, tagPidSequenceNr, event.timeUuid))
case None =>
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
index f5b1c14..4374cd9 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
@@ -106,10 +106,9 @@ import scala.util.Try
def writeProgress(tag: Tag, persistenceId: String, seqNr: Long, tagPidSequenceNr: Long, offset: UUID)(
implicit ec: ExecutionContext): Future[Done] = {
WriteTagProgress
- .map(
- ps =>
- ps.bind(persistenceId, tag, seqNr: JLong, tagPidSequenceNr: JLong, offset)
- .setExecutionProfileName(writeProfile))
+ .map(ps =>
+ ps.bind(persistenceId, tag, seqNr: JLong, tagPidSequenceNr: JLong, offset)
+ .setExecutionProfileName(writeProfile))
.flatMap(executeWrite)
}
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index edcfe8e..f8d1471 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -452,7 +452,6 @@ import akka.persistence.cassandra.PluginSettings
}
case QueryIdle | _: QueryInProgress | _: QueryResult => // ok
-
}
}
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
index ddc3882..ecbc797 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
@@ -99,10 +99,10 @@ import scala.compat.java8.FutureConverters._
implicit ec: ExecutionContext,
scheduler: Scheduler): Future[AsyncResultSet] = {
Retries.retry({ () =>
- val bound =
- statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
- session.executeAsync(bound).toScala
- }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
+ val bound =
+ statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
+ session.executeAsync(bound).toScala
+ }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
}
}
@@ -121,7 +121,6 @@ import scala.compat.java8.FutureConverters._
final case class MissingData(maxOffset: UUID, maxSequenceNr: TagPidSequenceNr)
/**
- *
* @param queryPrevious Should the previous bucket be queries right away. Searches go back one bucket, first the current bucket then
* the previous bucket. This is repeated every refresh interval.
* @param gapDetected Whether an explicit gap has been detected e.g. events 1-4 have been seen then the next event is not 5.
@@ -190,7 +189,8 @@ import scala.compat.java8.FutureConverters._
// override to give nice offset formatting
override def toString: String =
- s"""StageState(state: $state, fromOffset: ${formatOffset(fromOffset)}, toOffset: ${formatOffset(toOffset)}, tagPidSequenceNrs: $tagPidSequenceNrs, missingLookup: $missingLookup, bucketSize: $bucketSize)"""
+ s"""StageState(state: $state, fromOffset: ${formatOffset(fromOffset)}, toOffset: ${formatOffset(
+ toOffset)}, tagPidSequenceNrs: $tagPidSequenceNrs, missingLookup: $missingLookup, bucketSize: $bucketSize)"""
}
private val uuidRowOrdering = new Ordering[UUIDRow] {
@@ -377,9 +377,10 @@ import scala.compat.java8.FutureConverters._
}
override def preStart(): Unit = {
- stageState = StageState(QueryIdle, initialQueryOffset, calculateToOffset(), initialTagPidSequenceNrs.transform {
- case (_, (tagPidSequenceNr, offset)) => (tagPidSequenceNr, offset, System.currentTimeMillis())
- }, delayedScanInProgress = false, System.currentTimeMillis(), None, bucketSize)
+ stageState = StageState(QueryIdle, initialQueryOffset, calculateToOffset(),
+ initialTagPidSequenceNrs.transform {
+ case (_, (tagPidSequenceNr, offset)) => (tagPidSequenceNr, offset, System.currentTimeMillis())
+ }, delayedScanInProgress = false, System.currentTimeMillis(), None, bucketSize)
if (log.isInfoEnabled) {
log.info(
s"[{}]: EventsByTag query [${session.tag}] starting with EC delay {}ms: fromOffset [{}] toOffset [{}]",
@@ -525,7 +526,8 @@ import scala.compat.java8.FutureConverters._
.selectEventsForBucket(
stageState.currentTimeBucket,
stageState.fromOffset,
- stageState.toOffset, { (attempt, t, nextRetry) =>
+ stageState.toOffset,
+ { (attempt, t, nextRetry) =>
if (log.isWarningEnabled) {
log.warning(
s"[{}] Query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ${attempt}. Next retry in: ${nextRetry.pretty}. Reason: ${t.getMessage}",
@@ -566,7 +568,8 @@ import scala.compat.java8.FutureConverters._
.selectEventsForBucket(
missing.bucket,
missing.minOffset,
- missing.maxOffset, { (attempt, t, nextRetry) =>
+ missing.maxOffset,
+ { (attempt, t, nextRetry) =>
if (log.isWarningEnabled) {
log.warning(
s"[{}] Looking for missing query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ${attempt}. Next retry in: ${nextRetry.pretty}. Reason: ${t.getMessage}",
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
index 8102598..60b34f1 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
@@ -67,8 +67,8 @@ import akka.stream.scaladsl.Sink
toOffset: UUID,
bucketSize: BucketSize,
scanningPeriod: FiniteDuration,
- whichToKeep: (TagPidSequenceNr, TagPidSequenceNr) => TagPidSequenceNr)
- : Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
+ whichToKeep: (TagPidSequenceNr,
+ TagPidSequenceNr) => TagPidSequenceNr): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
def doIt(): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
index 8b90198..216f47e 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
@@ -44,7 +44,6 @@ object CassandraReadJournal {
* Configuration settings can be defined in the configuration section with the
* absolute path corresponding to the identifier, which is `"akka.persistence.cassandra.query"`
* for the default [[CassandraReadJournal#Identifier]]. See `reference.conf`.
- *
*/
class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query.scaladsl.CassandraReadJournal)
extends ReadJournal
@@ -154,7 +153,6 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
*
* Use `NoOffset` when you want all events from the beginning of time.
* To acquire an offset from a long unix timestamp to use with this query, you can use [[timeBasedUUIDFrom]].
- *
*/
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
scaladslReadJournal.currentEventsByTag(tag, offset).asJava
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/package.scala b/core/src/main/scala/akka/persistence/cassandra/query/package.scala
index e143ffb..0c6ec42 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/package.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/package.scala
@@ -35,10 +35,10 @@ package object query {
val timestamp = (time - uuidEpoch) * 10000
var msb = 0L
- msb |= (0X00000000FFFFFFFFL & timestamp) << 32
- msb |= (0X0000FFFF00000000L & timestamp) >>> 16
- msb |= (0X0FFF000000000000L & timestamp) >>> 48
- msb |= 0X0000000000001000L // sets the version to 1.
+ msb |= (0x00000000FFFFFFFFL & timestamp) << 32
+ msb |= (0x0000FFFF00000000L & timestamp) >>> 16
+ msb |= (0x0FFF000000000000L & timestamp) >>> 48
+ msb |= 0x0000000000001000L // sets the version to 1.
msb
}
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
index 7045cb8..e4ed50d 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
@@ -479,7 +479,6 @@ class CassandraReadJournal protected (
*
* Use `NoOffset` when you want all events from the beginning of time.
* To acquire an offset from a long unix timestamp to use with this query, you can use [[timeBasedUUIDFrom]].
- *
*/
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
currentEventsByTagInternal(tag, offset)
@@ -654,8 +653,8 @@ class CassandraReadJournal protected (
fastForwardEnabled))
.named(name)
}.mapAsync(querySettings.deserializationParallelism) { row =>
- extractor.extract(row, deserializeEventAsync)
- }
+ extractor.extract(row, deserializeEventAsync)
+ }
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
}
diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index bf61c26..46b71aa 100644
--- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -193,20 +193,22 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
}
}
- /** Plugin API: deletes all snapshots matching `criteria`. This call is protected with a circuit-breaker.
+ /**
+ * Plugin API: deletes all snapshots matching `criteria`. This call is protected with a circuit-breaker.
*
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting. If no timestamp constraints are specified this routine
* @note Due to the limitations of Cassandra deletion requests, this routine makes an initial query in order to obtain the
* records matching the criteria which are then deleted in a batch deletion. Improvements in Cassandra v3.0+ mean a single
* range deletion on the sequence number is used instead, except if timestamp constraints are specified, which still
- * requires the original two step routine.*/
+ * requires the original two step routine.
+ */
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
session.serverMetaData.flatMap { meta =>
if (meta.isVersion2
- || settings.cosmosDb
- || 0L < criteria.minTimestamp
- || criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
+ || settings.cosmosDb
+ || 0L < criteria.minTimestamp
+ || criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
preparedSelectSnapshotMetadata.flatMap { snapshotMetaPs =>
// this meta query gets slower than slower if snapshots are deleted without a criteria.minSequenceNr as
// all previous tombstones are scanned in the meta data query
diff --git a/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala b/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
index 6a162fd..ee800c6 100644
--- a/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
+++ b/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
@@ -3,16 +3,16 @@ package akka.cluster.persistence.cassandra
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
-import akka.actor.{Actor, ActorRef, ActorSystem, Address, RootActorPath}
-import akka.cluster.{Cluster, ClusterReadView, MemberStatus, _}
+import akka.actor.{ Actor, ActorRef, ActorSystem, Address, RootActorPath }
+import akka.cluster.{ Cluster, ClusterReadView, MemberStatus, _ }
import akka.event.Logging.ErrorLevel
import akka.remote.testconductor.RoleName
-import akka.remote.testkit.{FlightRecordingSupport, MultiNodeSpec}
+import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec }
import akka.testkit.TestEvent._
import akka.testkit._
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.exceptions.TestCanceledException
-import org.scalatest.{Canceled, Outcome, Suite}
+import org.scalatest.{ Canceled, Outcome, Suite }
import scala.collection.immutable
import scala.concurrent.duration._
@@ -21,8 +21,9 @@ import scala.language.implicitConversions
object MultiNodeClusterSpec {
def clusterConfigWithFailureDetectorPuppet: Config =
- ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").
- withFallback(clusterConfig)
+ ConfigFactory.parseString(
+ "akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").withFallback(
+ clusterConfig)
def clusterConfig(failureDetectorPuppet: Boolean): Config =
if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
@@ -72,20 +73,20 @@ object MultiNodeClusterSpec {
class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
import EndActor._
def receive: Receive = {
- case SendEnd ⇒
- target foreach { t ⇒
+ case SendEnd =>
+ target.foreach { t =>
context.actorSelection(RootActorPath(t) / self.path.elements) ! End
}
- case End ⇒
- testActor forward End
+ case End =>
+ testActor.forward(End)
sender() ! EndAck
- case EndAck ⇒
- testActor forward EndAck
+ case EndAck =>
+ testActor.forward(EndAck)
}
}
}
-trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordingSupport { self: MultiNodeSpec ⇒
+trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordingSupport { self: MultiNodeSpec =>
override def initialParticipants = roles.size
@@ -111,9 +112,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
".*Cluster Node.* - is starting up.*",
".*Shutting down cluster Node.*",
".*Cluster node successfully shut down.*",
- ".*Using a dedicated scheduler for cluster.*") foreach { s ⇒
- sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
- }
+ ".*Using a dedicated scheduler for cluster.*").foreach { s =>
+ sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
+ }
muteDeadLetters(
classOf[akka.actor.PoisonPill],
@@ -154,11 +155,11 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
*/
implicit def address(role: RoleName): Address = {
cachedAddresses.get(role) match {
- case null ⇒
+ case null =>
val address = node(role).address
cachedAddresses.put(role, address)
address
- case address ⇒ address
+ case address => address
}
}
@@ -188,7 +189,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
*/
def startClusterNode(): Unit = {
if (clusterView.members.isEmpty) {
- cluster join myself
+ cluster.join(myself)
awaitAssert(clusterView.members.map(_.address) should contain(address(myself)))
} else
clusterView.self
@@ -221,19 +222,20 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
*/
def joinWithin(joinNode: RoleName, max: Duration = remainingOrDefault, interval: Duration = 1.second): Unit = {
def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
- clusterView.members.exists { m ⇒ (m.address == member) && status.contains(m.status) }
+ clusterView.members.exists { m => (m.address == member) && status.contains(m.status) }
cluster.join(joinNode)
- awaitCond({
- clusterView.refreshCurrentState()
- if (memberInState(joinNode, List(MemberStatus.up)) &&
- memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
- true
- else {
- cluster.join(joinNode)
- false
- }
- }, max, interval)
+ awaitCond(
+ {
+ clusterView.refreshCurrentState()
+ if (memberInState(joinNode, List(MemberStatus.up)) &&
+ memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
+ true
+ else {
+ cluster.join(joinNode)
+ false
+ }
+ }, max, interval)
}
/**
@@ -243,7 +245,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
val members = gotMembers.toIndexedSeq
members.size should ===(expectedAddresses.length)
- expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address should ===(a) }
+ expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) => members(i).address should ===(a) }
}
/**
@@ -269,14 +271,14 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
*/
def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
if (nodesInCluster.contains(myself)) {
- nodesInCluster.length should not be (0)
+ nodesInCluster.length should not be 0
val expectedLeader = roleOfLeader(nodesInCluster)
val leader = clusterView.leader
val isLeader = leader == Some(clusterView.selfAddress)
assert(
isLeader == isNode(expectedLeader),
"expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
- clusterView.status should (be(MemberStatus.Up) or be(MemberStatus.Leaving))
+ clusterView.status should (be(MemberStatus.Up).or(be(MemberStatus.Leaving)))
}
/**
@@ -284,17 +286,17 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
* Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
*/
def awaitMembersUp(
- numberOfMembers: Int,
- canNotBePartOfMemberRing: Set[Address] = Set.empty,
- timeout: FiniteDuration = 25.seconds): Unit = {
+ numberOfMembers: Int,
+ canNotBePartOfMemberRing: Set[Address] = Set.empty,
+ timeout: FiniteDuration = 25.seconds): Unit = {
within(timeout) {
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
- awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) should not contain (a)))
+ awaitAssert(canNotBePartOfMemberRing.foreach(a => clusterView.members.map(_.address) should not contain a))
awaitAssert(clusterView.members.size should ===(numberOfMembers))
awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up)))
// clusterView.leader is updated by LeaderChanged, await that to be updated also
val expectedLeader = clusterView.members.collectFirst {
- case m if m.dataCenter == cluster.settings.SelfDataCenter ⇒ m.address
+ case m if m.dataCenter == cluster.settings.SelfDataCenter => m.address
}
awaitAssert(clusterView.leader should ===(expectedLeader))
}
@@ -307,7 +309,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
* Wait until the specified nodes have seen the same gossip overview.
*/
def awaitSeenSameState(addresses: Address*): Unit =
- awaitAssert((addresses.toSet diff clusterView.seenBy) should ===(Set.empty))
+ awaitAssert((addresses.toSet.diff(clusterView.seenBy)) should ===(Set.empty))
/**
* Leader according to the address ordering of the roles.
@@ -318,7 +320,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
* be determined from the `RoleName`.
*/
def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
- nodesInCluster.length should not be (0)
+ nodesInCluster.length should not be 0
nodesInCluster.sorted.head
}
@@ -332,7 +334,4 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
-
}
-
-
diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
index 0f0fb98..8dcd930 100644
--- a/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
@@ -54,7 +54,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
- eventTags.foreach({ tag =>
+ eventTags.foreach { tag =>
try {
validateTagStream(readJournal)(tag)
} catch {
@@ -66,7 +66,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
throw new RuntimeException("Only passed the second time")
}
- })
+ }
}
}
@@ -78,14 +78,15 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
probe.request(messagesPerPersistenceId * nrPersistenceIds)
(1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { i: Long =>
- val event = try {
- probe.expectNext(veryLongWait)
- } catch {
- case e: AssertionError =>
- system.log.error(e, s"Failed to get event: $i")
- allReceived.filter(_._2.size != messagesPerPersistenceId).foreach(p => system.log.info("{}", p))
- throw e
- }
+ val event =
+ try {
+ probe.expectNext(veryLongWait)
+ } catch {
+ case e: AssertionError =>
+ system.log.error(e, s"Failed to get event: $i")
+ allReceived.filter(_._2.size != messagesPerPersistenceId).foreach(p => system.log.info("{}", p))
+ throw e
+ }
allReceived += (event.persistenceId -> (event.sequenceNr :: allReceived(event.persistenceId)))
var fail = false
diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
index 341aa18..702ee40 100644
--- a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
@@ -179,8 +179,8 @@ abstract class CassandraSpec(
.asScala
.foreach(row => {
println(s"""Row:${row.getString("tag_name")},${row.getLong("timebucket")},${formatOffset(
- row.getUuid("timestamp"))},${row.getString("persistence_id")},${row
- .getLong("tag_pid_sequence_nr")},${row.getLong("sequence_nr")}""")
+ row.getUuid("timestamp"))},${row.getString("persistence_id")},${row
+ .getLong("tag_pid_sequence_nr")},${row.getLong("sequence_nr")}""")
})
}
@@ -190,7 +190,7 @@ abstract class CassandraSpec(
.asScala
.foreach(row => {
println(s"""Row:${row.getLong("partition_nr")}, ${row.getString("persistence_id")}, ${row.getLong(
- "sequence_nr")}""")
+ "sequence_nr")}""")
})
println("snapshots")
diff --git a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
index f1c9eb6..23958ba 100644
--- a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
@@ -69,7 +69,7 @@ class EventsByTagMigrationProvidePersistenceIds extends AbstractEventsByTagMigra
val pidOne = "pOne"
val pidTwo = "pTwo"
- "support migrating a subset of persistenceIds" taggedAs (RequiresCassandraThree) in {
+ "support migrating a subset of persistenceIds" taggedAs RequiresCassandraThree in {
writeOldTestEventWithTags(PersistentRepr("e-1", 1, pidOne), Set("blue"))
writeOldTestEventWithTags(PersistentRepr("e-2", 2, pidOne), Set("blue"))
writeOldTestEventWithTags(PersistentRepr("f-1", 1, pidTwo), Set("blue"))
@@ -113,7 +113,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
val pidWithSnapshot = "pidSnapshot"
val pidExcluded = "pidExcluded"
- "have some existing tagged messages" taggedAs (RequiresCassandraThree) in {
+ "have some existing tagged messages" taggedAs RequiresCassandraThree in {
// this one uses the 0.7 schema, soo old.
writeOldTestEventInMessagesColumn(PersistentRepr("e-1", 1L, pidOne), Set("blue", "green", "orange"))
@@ -138,7 +138,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
reconciler.rebuildAllPersistenceIds().futureValue
}
- "allow creation of the new tags view table" taggedAs (RequiresCassandraThree) in {
+ "allow creation of the new tags view table" taggedAs RequiresCassandraThree in {
migrator.createTables().futureValue shouldEqual Done
}
@@ -146,7 +146,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
migrator.migrateToTagViews(filter = _ != pidExcluded).futureValue shouldEqual Done
}
- "be idempotent so it can be restarted" taggedAs (RequiresCassandraThree) in {
+ "be idempotent so it can be restarted" taggedAs RequiresCassandraThree in {
// add some more events to be picked up
writeOldTestEventWithTags(PersistentRepr("f-1", 1L, pidTwo), Set("green"))
writeOldTestEventWithTags(PersistentRepr("f-2", 2L, pidTwo), Set("blue"))
@@ -159,11 +159,11 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
reconciler.rebuildAllPersistenceIds().futureValue
}
- "allow a second migration to resume from where the last one got to" taggedAs (RequiresCassandraThree) in {
+ "allow a second migration to resume from where the last one got to" taggedAs RequiresCassandraThree in {
migrator.migrateToTagViews(filter = _ != pidExcluded).futureValue shouldEqual Done
}
- "migrate events missed during the large migration as part of actor recovery" taggedAs (RequiresCassandraThree) in {
+ "migrate events missed during the large migration as part of actor recovery" taggedAs RequiresCassandraThree in {
// these events mimic the old version still running and persisting events
writeOldTestEventWithTags(PersistentRepr("f-3", 3L, pidTwo), Set("green"))
writeOldTestEventWithTags(PersistentRepr("f-4", 4L, pidTwo), Set("blue"))
@@ -172,11 +172,11 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
reconciler.rebuildAllPersistenceIds().futureValue
}
- "allow adding of the new tags column" taggedAs (RequiresCassandraThree) in {
+ "allow adding of the new tags column" taggedAs RequiresCassandraThree in {
migrator.addTagsColumn().futureValue shouldEqual Done
}
- "work with the current implementation" taggedAs (RequiresCassandraThree) in {
+ "work with the current implementation" taggedAs RequiresCassandraThree in {
val blueSrc: Source[EventEnvelope, NotUsed] = queries.eventsByTag("blue", NoOffset)
val blueProbe = blueSrc.runWith(TestSink.probe[Any])
blueProbe.request(5)
@@ -226,7 +226,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
excludedProbe.cancel()
}
- "see events missed by migration if the persistent actor is started" taggedAs (RequiresCassandraThree) in {
+ "see events missed by migration if the persistent actor is started" taggedAs RequiresCassandraThree in {
val probe = TestProbe()
systemTwo.actorOf(TestTaggingActor.props(pidTwo, probe = Some(probe.ref)))
probe.expectMsg(RecoveryCompleted)
@@ -252,19 +252,19 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
}
// This will be left as a manual step for the user as it stops
// rolling back to the old version
- "allow dropping of the materialized view" taggedAs (RequiresCassandraThree) in {
+ "allow dropping of the materialized view" taggedAs RequiresCassandraThree in {
system.log.info("Dropping old materialzied view")
cluster.execute(SimpleStatement.newInstance(s"DROP MATERIALIZED VIEW $eventsByTagViewName"))
system.log.info("Dropped old materialzied view")
}
- "have a peek in the messages table" taggedAs (RequiresCassandraThree) in {
+ "have a peek in the messages table" taggedAs RequiresCassandraThree in {
val row = cluster.execute(SimpleStatement.newInstance(s"select * from ${messagesTableName} limit 1")).one()
system.log.debug("New messages table looks like: {}", row)
system.log.debug("{}", row.getColumnDefinitions)
}
- "be able to add tags to existing pids" taggedAs (RequiresCassandraThree) in {
+ "be able to add tags to existing pids" taggedAs RequiresCassandraThree in {
// we need a new actor system for this as the old one will have prepared the statements without
// the tags column existing
val pidOnePA = systemTwo.actorOf(TestTaggingActor.props(pidOne, Set("blue", "yellow")))
@@ -283,7 +283,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
blueProbe.expectNextPF {
case EventEnvelope(_, `pidWithMeta`, 1, "g-1") =>
}
- blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 4, "f-4") => }
+ blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 4, "f-4") => }
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 5, "new-event-1") => }
blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 6, "new-event-2") => }
blueProbe.expectNoMessage(waitTime)
@@ -293,13 +293,13 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
// Again a manual step, leaving them is only wasting disk space
// the new version will work with these columns still there
- "allow dropping of tag columns" taggedAs (RequiresCassandraThree) in {
+ "allow dropping of tag columns" taggedAs RequiresCassandraThree in {
cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag1")
cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag2")
cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag3")
}
- "still work after dropping the tag columns" taggedAs (RequiresCassandraThree) in {
+ "still work after dropping the tag columns" taggedAs RequiresCassandraThree in {
val pidTwoPA = systemThree.actorOf(TestTaggingActor.props(pidTwo, Set("orange")))
pidTwoPA ! "new-event-1"
expectMsg(Ack)
@@ -309,7 +309,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
val orangeSrc: Source[EventEnvelope, NotUsed] = queriesThree.eventsByTag("orange", NoOffset)
val orangeProbe = orangeSrc.runWith(TestSink.probe[Any])(SystemMaterializer(systemThree).materializer)
orangeProbe.request(3)
- orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
+ orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 5, "new-event-1") => }
orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 6, "new-event-2") => }
orangeProbe.expectNoMessage(waitTime)
diff --git a/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala b/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
index be88d4d..3241db6 100644
--- a/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
@@ -29,9 +29,9 @@ class RetriesSpec
@volatile var called = 0
val result = Retries
.retry(() => {
- called += 1
- Future.failed(new RuntimeException(s"cats $called"))
- }, 3, (_, exc, _) => failProbe.ref ! exc, 1.milli, 2.millis, 0.1)
+ called += 1
+ Future.failed(new RuntimeException(s"cats $called"))
+ }, 3, (_, exc, _) => failProbe.ref ! exc, 1.milli, 2.millis, 0.1)
.failed
.futureValue
called shouldEqual 3
diff --git a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
index 40d85b8..7cc691e 100644
--- a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
@@ -96,7 +96,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
cleanup.deleteAllEvents(pid, neverUsePersistenceIdAgain = true).futureValue
// also delete from all_persistence_ids
- queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain (pid)
+ queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain pid
val p2 = system.actorOf(TestActor.props(pid))
p2 ! GetRecoveredState
@@ -192,7 +192,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
cleanup.deleteAll(pid, neverUsePersistenceIdAgain = true).futureValue
// also delete from all_persistence_ids
- queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain (pid)
+ queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain pid
val p2 = system.actorOf(TestActor.props(pid))
p2 ! GetRecoveredState
@@ -202,7 +202,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
queries.currentEventsByTag(tag = "tag-b", offset = NoOffset).runWith(Sink.seq).futureValue.size should ===(0)
}
- "delete some for one persistenceId" taggedAs (RequiresCassandraThree) in {
+ "delete some for one persistenceId" taggedAs RequiresCassandraThree in {
val pid = nextPid
val p = system.actorOf(TestActor.props(pid))
(1 to 8).foreach { i =>
@@ -219,7 +219,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
expectMsg(RecoveredState("", List("evt-6", "evt-7", "evt-8"), 8L))
}
- "clean up before latest snapshot for one persistence id" taggedAs (RequiresCassandraThree) in {
+ "clean up before latest snapshot for one persistence id" taggedAs RequiresCassandraThree in {
val pid = nextPid
val p = system.actorOf(TestActor.props(pid))
(1 to 3).foreach { i =>
@@ -259,7 +259,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
.futureValue shouldEqual List("evt-7", "evt-8", "evt-9")
}
- "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs (RequiresCassandraThree) in {
+ "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs RequiresCassandraThree in {
val pid = nextPid
val p = system.actorOf(TestActor.props(pid))
(1 to 3).foreach { i =>
@@ -299,7 +299,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
.futureValue shouldEqual List("evt-4", "evt-5", "evt-6", "evt-7", "evt-8", "evt-9")
}
- "clean up before snapshot including timestamp for one persistence id" taggedAs (RequiresCassandraThree) in {
+ "clean up before snapshot including timestamp for one persistence id" taggedAs RequiresCassandraThree in {
val pid = nextPid
val p = system.actorOf(TestActor.props(pid))
(1 to 3).foreach { i =>
@@ -440,7 +440,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
}
"Time and snapshot based cleanup" must {
- "keep the correct number of snapshots" taggedAs (RequiresCassandraThree) in {
+ "keep the correct number of snapshots" taggedAs RequiresCassandraThree in {
val cleanup = new Cleanup(system)
val pid = nextPid
writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -457,7 +457,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000))
}
- "keep the all snapshots if fewer than requested without timestamp" taggedAs (RequiresCassandraThree) in {
+ "keep the all snapshots if fewer than requested without timestamp" taggedAs RequiresCassandraThree in {
val cleanup = new Cleanup(system)
val pid = nextPid
writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -477,7 +477,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 1, 1000))
}
- "keep the all snapshots if fewer than requested with timestamp" taggedAs (RequiresCassandraThree) in {
+ "keep the all snapshots if fewer than requested with timestamp" taggedAs RequiresCassandraThree in {
val cleanup = new Cleanup(system)
val pid = nextPid
writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -512,7 +512,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
oldestSnapshot shouldEqual None
}
- "don't delete snapshots newer than the oldest date" taggedAs (RequiresCassandraThree) in {
+ "don't delete snapshots newer than the oldest date" taggedAs RequiresCassandraThree in {
val cleanup = new Cleanup(system)
val pid = nextPid
writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -534,7 +534,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000))
}
- "keep snapshots older than the oldest date to meet snapshotsToKeep" taggedAs (RequiresCassandraThree) in {
+ "keep snapshots older than the oldest date to meet snapshotsToKeep" taggedAs RequiresCassandraThree in {
val cleanup = new Cleanup(system)
val pid = nextPid
writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
diff --git a/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala b/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
index 3b54095..dd728c3 100644
--- a/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
@@ -47,7 +47,7 @@ class CassandraCompactionStrategySpec
compactionStrategy.compactionWindowUnit shouldEqual TimeUnit.DAYS
}
- "successfully create CQL from TimeWindowCompactionStrategy" taggedAs (RequiresCassandraThree) in {
+ "successfully create CQL from TimeWindowCompactionStrategy" taggedAs RequiresCassandraThree in {
val twConfig = ConfigFactory.parseString("""journal.table-compaction-strategy {
| class = "TimeWindowCompactionStrategy"
| compaction_window_size = 1
@@ -57,7 +57,7 @@ class CassandraCompactionStrategySpec
val cqlExpression =
s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable1 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
- twConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+ twConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
noException must be thrownBy {
cluster.execute(cqlExpression)
@@ -112,7 +112,7 @@ class CassandraCompactionStrategySpec
val cqlExpression =
s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable2 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
- uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+ uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
noException must be thrownBy {
cluster.execute(cqlExpression)
@@ -165,7 +165,7 @@ class CassandraCompactionStrategySpec
val cqlExpression =
s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable3 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
- uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+ uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
noException must be thrownBy {
cluster.execute(cqlExpression)
diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
index 254452e..1eab4c1 100644
--- a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
@@ -59,10 +59,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
"Cassandra query EventsByPersistenceId" must {
"not replay dropped events by the event-adapter" in {
- setup("a", 6, {
- case x if x % 2 == 0 => "dropped:"
- case _ => ""
- })
+ setup("a", 6,
+ {
+ case x if x % 2 == 0 => "dropped:"
+ case _ => ""
+ })
val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
src
@@ -78,10 +79,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
"replay duplicate events by the event-adapter" in {
- setup("b", 3, {
- case x if x % 2 == 0 => "duplicated:"
- case _ => ""
- })
+ setup("b", 3,
+ {
+ case x if x % 2 == 0 => "duplicated:"
+ case _ => ""
+ })
val src = queries.currentEventsByPersistenceId("b", 0L, Long.MaxValue)
src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNext("b-1", "b-2", "b-2", "b-3").expectComplete()
@@ -100,10 +102,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
"Cassandra query EventsByTag" must {
"not replay events dropped by the event-adapter" in {
- setup("d", 6, tagged("red") {
- case x if x % 2 == 0 => "dropped:"
- case _ => ""
- })
+ setup("d", 6,
+ tagged("red") {
+ case x if x % 2 == 0 => "dropped:"
+ case _ => ""
+ })
val src = queries.eventsByTag("red", NoOffset)
val sub = src.map(_.event).runWith(TestSink.probe[Any])
@@ -115,10 +118,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
"replay events duplicated by the event-adapter" in {
- setup("e", 3, tagged("yellow") {
- case x if x % 2 == 0 => "duplicated:"
- case _ => ""
- })
+ setup("e", 3,
+ tagged("yellow") {
+ case x if x % 2 == 0 => "duplicated:"
+ case _ => ""
+ })
val src = queries.eventsByTag("yellow", NoOffset)
val sub = src.map(_.event).runWith(TestSink.probe[Any])
@@ -129,10 +133,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
"replay events transformed by the event-adapter" in {
- setup("e", 3, tagged("green") {
- case x if x % 2 == 0 => "prefixed:foo:"
- case _ => ""
- })
+ setup("e", 3,
+ tagged("green") {
+ case x if x % 2 == 0 => "prefixed:foo:"
+ case _ => ""
+ })
val src = queries.eventsByTag("green", NoOffset)
val sub = src.map(_.event).runWith(TestSink.probe[Any])
diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
index c37cdc8..92c20ed 100644
--- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
@@ -200,7 +200,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
val probe = greenSrc.runWith(TestSink.probe[Any])
probe.request(2)
- probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
probe.expectNoMessage(500.millis)
probe.request(2)
@@ -233,7 +233,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
val probe = greenSrc.runWith(TestSink.probe[Any])
probe.request(2)
- probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
probe.expectNoMessage(waitTime)
@@ -274,7 +274,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
if (appleTimestamp == bananaTimestamp)
probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
- probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
+ probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
probe2.cancel()
}
@@ -349,22 +349,23 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
"find new events" in {
val d = system.actorOf(TestActor.props("d"))
- withProbe(queries.eventsByTag(tag = "black", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(2)
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car") => e }
- probe.expectNoMessage(waitTime)
+ withProbe(queries.eventsByTag(tag = "black", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(2)
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car") => e }
+ probe.expectNoMessage(waitTime)
- d ! "a black dog"
- expectMsg(s"a black dog-done")
- d ! "a black night"
- expectMsg(s"a black night-done")
+ d ! "a black dog"
+ expectMsg(s"a black dog-done")
+ d ! "a black night"
+ expectMsg(s"a black night-done")
- probe.expectNextPF { case e @ EventEnvelope(_, "d", 1L, "a black dog") => e }
- probe.expectNoMessage(waitTime)
- probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "d", 2L, "a black night") => e }
- probe.cancel()
- })
+ probe.expectNextPF { case e @ EventEnvelope(_, "d", 1L, "a black dog") => e }
+ probe.expectNoMessage(waitTime)
+ probe.request(10)
+ probe.expectNextPF { case e @ EventEnvelope(_, "d", 2L, "a black night") => e }
+ probe.cancel()
+ })
}
"find events from timestamp offset" in {
@@ -394,8 +395,8 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
probe2.request(10)
if (appleTimestamp == bananaTimestamp)
probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
- probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
- probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
+ probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
+ probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
probe2.expectNoMessage(waitTime)
})
@@ -404,19 +405,20 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
}
"find events from UUID offset " in {
- withProbe(queries.eventsByTag(tag = "green", offset = NoOffset).runWith(TestSink.probe[Any]), probe1 => {
- probe1.request(2)
- probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
- val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }.offset
- probe1.cancel()
-
- val greenSrc2 = queries.eventsByTag(tag = "green", offs)
- val probe2 = greenSrc2.runWith(TestSink.probe[Any])
- probe2.request(10)
- probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
- probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
- probe2.expectNoMessage(waitTime)
- })
+ withProbe(queries.eventsByTag(tag = "green", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe1 => {
+ probe1.request(2)
+ probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
+ val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }.offset
+ probe1.cancel()
+
+ val greenSrc2 = queries.eventsByTag(tag = "green", offs)
+ val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+ probe2.request(10)
+ probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
+ probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
+ probe2.expectNoMessage(waitTime)
+ })
}
"include timestamp in EventEnvelope" in {
@@ -449,21 +451,22 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
val pr2 = PersistentRepr("e2", 2L, "p1", "", writerUuid = w1)
writeTaggedEvent(t2, pr2, Set("T1-live"), 2, bucketSize)
- withProbe(queries.eventsByTag(tag = "T1-live", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
-
- val t3 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
- val pr3 = PersistentRepr("e3", 3L, "p1", "", writerUuid = w1)
- writeTaggedEvent(t3, pr3, Set("T1-live"), 3, bucketSize)
- val t4 = LocalDateTime.now(ZoneOffset.UTC)
- val pr4 = PersistentRepr("e4", 4L, "p1", "", writerUuid = w1)
- writeTaggedEvent(t4, pr4, Set("T1-live"), 4, bucketSize)
-
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "e3") => e }
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 4L, "e4") => e }
- })
+ withProbe(queries.eventsByTag(tag = "T1-live", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(10)
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
+
+ val t3 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
+ val pr3 = PersistentRepr("e3", 3L, "p1", "", writerUuid = w1)
+ writeTaggedEvent(t3, pr3, Set("T1-live"), 3, bucketSize)
+ val t4 = LocalDateTime.now(ZoneOffset.UTC)
+ val pr4 = PersistentRepr("e4", 4L, "p1", "", writerUuid = w1)
+ writeTaggedEvent(t4, pr4, Set("T1-live"), 4, bucketSize)
+
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "e3") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 4L, "e4") => e }
+ })
}
"sort events by timestamp" in {
@@ -476,21 +479,22 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
val pr3 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
writeTaggedEvent(t3, pr3, Set("T2"), 2, bucketSize)
- withProbe(queries.eventsByTag(tag = "T2", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(10)
-
- // simulate async eventually consistent Materialized View update
- // that cause p1-e2 to show up before p2-e1
- Thread.sleep(500)
- val t2 = t3.minus(1, ChronoUnit.MILLIS)
- val pr2 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
- writeTaggedEvent(t2, pr2, Set("T2"), 1, bucketSize)
+ withProbe(queries.eventsByTag(tag = "T2", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
- probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
- val e3 = probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
- (System.currentTimeMillis() - e3.timestamp) should be < 10000L
- })
+ // simulate async eventually consistent Materialized View update
+ // that cause p1-e2 to show up before p2-e1
+ Thread.sleep(500)
+ val t2 = t3.minus(1, ChronoUnit.MILLIS)
+ val pr2 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
+ writeTaggedEvent(t2, pr2, Set("T2"), 1, bucketSize)
+
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
+ val e3 = probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
+ (System.currentTimeMillis() - e3.timestamp) should be < 10000L
+ })
}
"stream many events" in {
@@ -604,21 +608,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
val p2e1 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
writeTaggedEvent(t2, p2e1, Set("T6"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T6", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
- probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
-
- // delayed, and timestamp is before p2-e1
- val t3 = t1.plusSeconds(1)
- val p1e2 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
- writeTaggedEvent(t3, p1e2, Set("T6"), 2, bucketSize)
- val p1e3 = PersistentRepr("p1-e3", 3L, "p1", "", writerUuid = w1)
- writeTaggedEvent(t2.plusSeconds(1), p1e3, Set("T6"), 3, bucketSize)
-
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
- probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "p1-e3") => e }
- })
+ withProbe(queries.eventsByTag(tag = "T6", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(10)
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
+
+ // delayed, and timestamp is before p2-e1
+ val t3 = t1.plusSeconds(1)
+ val p1e2 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
+ writeTaggedEvent(t3, p1e2, Set("T6"), 2, bucketSize)
+ val p1e3 = PersistentRepr("p1-e3", 3L, "p1", "", writerUuid = w1)
+ writeTaggedEvent(t2.plusSeconds(1), p1e3, Set("T6"), 3, bucketSize)
+
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "p1-e3") => e }
+ })
}
"find delayed events 2" in {
@@ -630,21 +635,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
writeTaggedEvent(t2, eventA1, Set("T7"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T7", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
+ withProbe(queries.eventsByTag(tag = "T7", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(10)
+ probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
- // delayed, timestamp is before A1
- val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
- writeTaggedEvent(t1, eventB1, Set("T7"), 1, bucketSize)
- // second delayed is after A1 so should be found and trigger a search for B1
- val t3 = t1.plusSeconds(2)
- val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
- writeTaggedEvent(t3, eventB2, Set("T7"), 2, bucketSize)
+ // delayed, timestamp is before A1
+ val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t1, eventB1, Set("T7"), 1, bucketSize)
+ // second delayed is after A1 so should be found and trigger a search for B1
+ val t3 = t1.plusSeconds(2)
+ val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t3, eventB2, Set("T7"), 2, bucketSize)
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e } // failed in travis
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
- })
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e } // failed in travis
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+ })
}
"find delayed events 3" in {
@@ -659,21 +665,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
writeTaggedEvent(t2, eventA1, Set("T8"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T8", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(10)
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
- probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
-
- // delayed, timestamp is before A1
- val eventB1 = PersistentRepr("B1", 2L, "b", "", writerUuid = w2)
- writeTaggedEvent(t1, eventB1, Set("T8"), 2, bucketSize)
- val t3 = t1.plusSeconds(2)
- val eventB2 = PersistentRepr("B2", 3L, "b", "", writerUuid = w2)
- writeTaggedEvent(t3, eventB2, Set("T8"), 3, bucketSize)
-
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B1") => e }
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 3L, "B2") => e }
- })
+ withProbe(queries.eventsByTag(tag = "T8", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(10)
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
+
+ // delayed, timestamp is before A1
+ val eventB1 = PersistentRepr("B1", 2L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t1, eventB1, Set("T8"), 2, bucketSize)
+ val t3 = t1.plusSeconds(2)
+ val eventB2 = PersistentRepr("B2", 3L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t3, eventB2, Set("T8"), 3, bucketSize)
+
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B1") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 3L, "B2") => e }
+ })
}
"find delayed events from offset" in {
@@ -684,25 +691,27 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
writeTaggedEvent(t1.plusSeconds(2), eventA1, Set("T9"), 1, bucketSize)
- withProbe(queries.eventsByTag(tag = "T9", offset = NoOffset).runWith(TestSink.probe[Any]), probe1 => {
- probe1.request(10)
- val offs =
- probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }.offset.asInstanceOf[TimeBasedUUID]
+ withProbe(queries.eventsByTag(tag = "T9", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe1 => {
+ probe1.request(10)
+ val offs =
+ probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }.offset.asInstanceOf[TimeBasedUUID]
- withProbe(queries.eventsByTag(tag = "T9", offset = offs).runWith(TestSink.probe[Any]), probe2 => {
- probe2.request(10)
+ withProbe(queries.eventsByTag(tag = "T9", offset = offs).runWith(TestSink.probe[Any]),
+ probe2 => {
+ probe2.request(10)
- // delayed, timestamp is before A1, i.e. before the offset so should not be picked up
- val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
- writeTaggedEvent(t1.plusSeconds(1), eventB1, Set("T9"), 1, bucketSize)
+ // delayed, timestamp is before A1, i.e. before the offset so should not be picked up
+ val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t1.plusSeconds(1), eventB1, Set("T9"), 1, bucketSize)
- // delayed, timestamp is after A1 so should be picked up
- val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
- writeTaggedEvent(t1.plusSeconds(3), eventB2, Set("T9"), 2, bucketSize)
+ // delayed, timestamp is after A1 so should be picked up
+ val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t1.plusSeconds(3), eventB2, Set("T9"), 2, bucketSize)
- probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+ probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+ })
})
- })
}
// Not supported atm as it requires us to back track without seeing a future event
@@ -717,29 +726,30 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
writeTaggedEvent(t1.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
}
- withProbe(queries.eventsByTag(tag = "T10", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(1000)
- probe.expectNextN(100)
+ withProbe(queries.eventsByTag(tag = "T10", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(1000)
+ probe.expectNextN(100)
- val t2 = t1.plusSeconds(1)
- (101L to 200L).foreach { n =>
- val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1)
- writeTaggedEvent(t2.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
- }
+ val t2 = t1.plusSeconds(1)
+ (101L to 200L).foreach { n =>
+ val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1)
+ writeTaggedEvent(t2.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
+ }
- // delayed, timestamp is before A101 but after A100
- val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
- writeTaggedEvent(t2.minus(100, ChronoUnit.MILLIS), eventB1, Set("T10"), 1, bucketSize)
+ // delayed, timestamp is before A101 but after A100
+ val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t2.minus(100, ChronoUnit.MILLIS), eventB1, Set("T10"), 1, bucketSize)
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e }
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e }
- // Now A101 - A200 can be delivered
- probe.expectNextN(100)
+ // Now A101 - A200 can be delivered
+ probe.expectNextN(100)
- val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
- writeTaggedEvent(t2.plusSeconds(1), eventB2, Set("T10"), 2, bucketSize)
- probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
- })
+ val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+ writeTaggedEvent(t2.plusSeconds(1), eventB2, Set("T10"), 2, bucketSize)
+ probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+ })
}
"find events from many persistenceIds" in {
@@ -815,10 +825,11 @@ class EventsByTagStrictBySeqNoEarlyFirstOffsetSpec
// the search for delayed events should start before we get to the current timebucket
// until 0.26/0.51 backtracking was broken and events would be skipped
- withProbe(queries.eventsByTag(tag = "T11", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(2000)
- probe.expectNextN(2000)
- })
+ withProbe(queries.eventsByTag(tag = "T11", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(2000)
+ probe.expectNextN(2000)
+ })
}
}
}
@@ -851,27 +862,30 @@ class EventsByTagLongRefreshIntervalSpec
sender.expectNoMessage(200.millis) // try and give time for the tagged event to be flushed so the query doesn't need to wait for the refresh interval
val offset: Offset =
- withProbe(queries.eventsByTag(tag = "animal", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
- probe.request(2)
- probe.expectNextPF {
- case EventEnvelope(offset, `pid`, 1L, "cat") =>
- offset
- }
- })
+ withProbe(queries.eventsByTag(tag = "animal", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(2)
+ probe.expectNextPF {
+ case EventEnvelope(offset, `pid`, 1L, "cat") =>
+ offset
+ }
+ })
pa.tell(Tagged("cat2", Set("animal")), sender.ref)
sender.expectMsg("cat2-done")
// flush interval for tag writes is 0ms but still give some time for the tag write to complete
sender.expectNoMessage(250.millis)
- withProbe(queries.eventsByTag(tag = "animal", offset = offset).runWith(TestSink.probe[Any]), probe => {
- probe.request(2)
- // less than the refresh interval, previously this would evaluate the new persistence-id timeout and then not re-evaluate
- // it again until the next refresh interval
- probe.expectNextWithTimeoutPF(2.seconds, {
- case EventEnvelope(_, `pid`, 2L, "cat2") =>
+ withProbe(queries.eventsByTag(tag = "animal", offset = offset).runWith(TestSink.probe[Any]),
+ probe => {
+ probe.request(2)
+ // less than the refresh interval, previously this would evaluate the new persistence-id timeout and then not re-evaluate
+ // it again until the next refresh interval
+ probe.expectNextWithTimeoutPF(2.seconds,
+ {
+ case EventEnvelope(_, `pid`, 2L, "cat2") =>
+ })
})
- })
}
}
@@ -1011,29 +1025,30 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even
writeTaggedEvent(t2, eventB, Set("T14"), n - 112, bucketSize)
}
- withProbe(queries.eventsByTag(tag = "T14", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
+ withProbe(queries.eventsByTag(tag = "T14", offset = NoOffset).runWith(TestSink.probe[Any]),
+ probe => {
- val requested1 = 130L
- probe.request(requested1)
- val expected1 = 100L + 12 * 2
- probe.expectNextN(expected1)
- probe.expectNoMessage(2.seconds)
+ val requested1 = 130L
+ probe.request(requested1)
+ val expected1 = 100L + 12 * 2
+ probe.expectNextN(expected1)
+ probe.expectNoMessage(2.seconds)
- system.log.debug("writing missing event, 113, and a bunch of delayed from C")
- (1L to 100L).foreach { n =>
- val eventC = PersistentRepr(s"C$n", n, "c", "", writerUuid = w3)
- val t = t1.plus(3 * n + 2, ChronoUnit.MILLIS)
- writeTaggedEvent(t, eventC, Set("T14"), n, bucketSize)
- }
- writeTaggedEvent(missingEventTime, missingEvent, Set("T14"), 101, bucketSize)
- val expected2 = requested1 - expected1
- probe.expectNextN(expected2)
- probe.expectNoMessage(200.millis)
-
- probe.request(1000)
- probe.expectNextN(8 + 100 - expected2)
- probe.expectNoMessage(200.millis)
- })
+ system.log.debug("writing missing event, 113, and a bunch of delayed from C")
+ (1L to 100L).foreach { n =>
+ val eventC = PersistentRepr(s"C$n", n, "c", "", writerUuid = w3)
+ val t = t1.plus(3 * n + 2, ChronoUnit.MILLIS)
+ writeTaggedEvent(t, eventC, Set("T14"), n, bucketSize)
+ }
+ writeTaggedEvent(missingEventTime, missingEvent, Set("T14"), 101, bucketSize)
+ val expected2 = requested1 - expected1
+ probe.expectNextN(expected2)
+ probe.expectNoMessage(200.millis)
+
+ probe.request(1000)
+ probe.expectNextN(8 + 100 - expected2)
+ probe.expectNoMessage(200.millis)
+ })
}
"find all events" in {
diff --git a/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala b/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
index 487f409..df7c4a2 100644
--- a/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
@@ -52,8 +52,8 @@ class CassandraLauncherSpec
CassandraLauncher.classpathForResources("logback-test.xml"))
awaitAssert({
- testCassandra()
- }, 45.seconds)
+ testCassandra()
+ }, 45.seconds)
CassandraLauncher.stop()
diff --git a/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala b/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
index a7c7a72..913950c 100644
--- a/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
+++ b/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
@@ -15,7 +15,7 @@ import akka.persistence.cassandra.reconciler.Reconciliation
class AllPersistenceIdsMigrationCompileOnly {
- //#migrate
+ // #migrate
// System should have the same Cassandra plugin configuration as your application
// but be careful to remove seed nodes so this doesn't join the cluster
val system = ActorSystem()
@@ -32,5 +32,5 @@ class AllPersistenceIdsMigrationCompileOnly {
system.log.error(e, "All persistenceIds migration failed.")
system.terminate()
}
- //#migrate
+ // #migrate
}
diff --git a/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala b/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
index af8b63b..8c321bd 100644
--- a/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
+++ b/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
@@ -14,7 +14,7 @@ import akka.Done
class ReconciliationCompileOnly {
- //#reconcile
+ // #reconcile
// System should have the same Cassandra plugin configuration as your application
// but be careful to remove seed nodes so this doesn't join the cluster
val system = ActorSystem()
@@ -32,5 +32,5 @@ class ReconciliationCompileOnly {
// optional: re-build, if this is ommited then it will be re-build next time the pid is started
_ <- rec.rebuildTagViewForPersistenceIds(pid)
} yield Done
- //#reconcile
+ // #reconcile
}
diff --git a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
index 1a68ec9..103d25d 100644
--- a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
+++ b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
@@ -12,7 +12,7 @@ object CleanupDocExample {
implicit val system: ActorSystem = ???
- //#cleanup
+ // #cleanup
val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val cleanup = new Cleanup(system)
@@ -31,6 +31,6 @@ object CleanupDocExample {
.mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant.toEpochMilli))
.run()
- //#cleanup
+ // #cleanup
}
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
index 2ea8d58..a50e3e6 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
@@ -38,8 +38,8 @@ class EventProcessorStream[Event: ClassTag](
readOffset().map { offset =>
log.infoN("Starting stream for tag [{}] from offset [{}]", tag, offset)
processEventsByTag(offset, histogram)
- // groupedWithin can be used here to improve performance by reducing number of offset writes,
- // with the trade-off of possibility of more duplicate events when stream is restarted
+ // groupedWithin can be used here to improve performance by reducing number of offset writes,
+ // with the trade-off of possibility of more duplicate events when stream is restarted
.mapAsync(1)(writeOffset)
}
}
@@ -52,21 +52,21 @@ class EventProcessorStream[Event: ClassTag](
query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope =>
eventEnvelope.event match {
case event: Event => {
- // Times from different nodes, take with a pinch of salt
- val latency = System.currentTimeMillis() - eventEnvelope.timestamp
- // when restarting without the offset the latency will be too big
- if (latency < histogram.getMaxValue) {
- histogram.recordValue(latency)
- }
- log.debugN(
- "Tag {} Event {} persistenceId {}, sequenceNr {}. Latency {}",
- tag,
- event,
- PersistenceId.ofUniqueId(eventEnvelope.persistenceId),
- eventEnvelope.sequenceNr,
- latency)
- Future.successful(Done)
- }.map(_ => eventEnvelope.offset)
+ // Times from different nodes, take with a pinch of salt
+ val latency = System.currentTimeMillis() - eventEnvelope.timestamp
+ // when restarting without the offset the latency will be too big
+ if (latency < histogram.getMaxValue) {
+ histogram.recordValue(latency)
+ }
+ log.debugN(
+ "Tag {} Event {} persistenceId {}, sequenceNr {}. Latency {}",
+ tag,
+ event,
+ PersistenceId.ofUniqueId(eventEnvelope.persistenceId),
+ eventEnvelope.sequenceNr,
+ latency)
+ Future.successful(Done)
+ }.map(_ => eventEnvelope.offset)
case other =>
Future.failed(new IllegalArgumentException(s"Unexpected event [${other.getClass.getName}]"))
}
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
index 8809898..183b90f 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
@@ -18,22 +18,22 @@ object Main {
def main(args: Array[String]): Unit = {
ActorSystem(Behaviors.setup[SelfUp] {
- ctx =>
- val readSettings = ReadSide.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
- val writeSettings = ConfigurablePersistentActor.Settings(readSettings.nrTags)
- val loadSettings = LoadGenerator.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
+ ctx =>
+ val readSettings = ReadSide.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
+ val writeSettings = ConfigurablePersistentActor.Settings(readSettings.nrTags)
+ val loadSettings = LoadGenerator.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
- AkkaManagement(ctx.system).start()
- ClusterBootstrap(ctx.system).start()
- val cluster = Cluster(ctx.system)
- cluster.subscriptions ! Subscribe(ctx.self, classOf[SelfUp])
+ AkkaManagement(ctx.system).start()
+ ClusterBootstrap(ctx.system).start()
+ val cluster = Cluster(ctx.system)
+ cluster.subscriptions ! Subscribe(ctx.self, classOf[SelfUp])
- val topic = ReadSideTopic.init(ctx)
+ val topic = ReadSideTopic.init(ctx)
- if (cluster.selfMember.hasRole("read")) {
- val session = CassandraSessionRegistry(ctx.system).sessionFor("akka.persistence.cassandra")
- val offsetTableStmt =
- """
+ if (cluster.selfMember.hasRole("read")) {
+ val session = CassandraSessionRegistry(ctx.system).sessionFor("akka.persistence.cassandra")
+ val offsetTableStmt =
+ """
CREATE TABLE IF NOT EXISTS akka.offsetStore (
eventProcessorId text,
tag text,
@@ -42,27 +42,27 @@ object Main {
)
"""
- Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
- }
+ Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
+ }
- Behaviors.receiveMessage {
- case SelfUp(state) =>
- ctx.log.infoN(
- "Cluster member joined. Initializing persistent actors. Roles {}. Members {}",
- cluster.selfMember.roles,
- state.members)
- val ref = ConfigurablePersistentActor.init(writeSettings, ctx.system)
- if (cluster.selfMember.hasRole("read")) {
- ctx.spawnAnonymous(Reporter(topic))
- }
- ReadSide(ctx.system, topic, readSettings)
- if (cluster.selfMember.hasRole("load")) {
- ctx.log.info("Starting load generation")
- val load = ctx.spawn(LoadGenerator(loadSettings, ref), "load-generator")
- load ! Start(10.seconds)
- }
- Behaviors.empty
- }
- }, "apc-example")
+ Behaviors.receiveMessage {
+ case SelfUp(state) =>
+ ctx.log.infoN(
+ "Cluster member joined. Initializing persistent actors. Roles {}. Members {}",
+ cluster.selfMember.roles,
+ state.members)
+ val ref = ConfigurablePersistentActor.init(writeSettings, ctx.system)
+ if (cluster.selfMember.hasRole("read")) {
+ ctx.spawnAnonymous(Reporter(topic))
+ }
+ ReadSide(ctx.system, topic, readSettings)
+ if (cluster.selfMember.hasRole("load")) {
+ ctx.log.info("Starting load generation")
+ val load = ctx.spawn(LoadGenerator(loadSettings, ref), "load-generator")
+ load ! Start(10.seconds)
+ }
+ Behaviors.empty
+ }
+ }, "apc-example")
}
}
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
index 91ca0d8..81f64d4 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
@@ -47,7 +47,7 @@ object ReadSide {
Behaviors.withTimers { timers =>
timers.startTimerAtFixedRate(ReportMetrics, 10.second)
Behaviors.setup { ctx =>
- val start = (settings.tagsPerProcessor * nr)
+ val start = settings.tagsPerProcessor * nr
val end = start + (settings.tagsPerProcessor) - 1
val tags = (start to end).map(i => s"tag-$i")
ctx.log.info("Processor {} processing tags {}", nr, tags)
@@ -59,13 +59,12 @@ object ReadSide {
// having more tags will also increase write throughput/latency as it'll write to
// many partitions
// downside is running many streams/queries against c*
- tags.foreach(
- tag =>
- new EventProcessorStream[ConfigurablePersistentActor.Event](
- ctx.system,
- ctx.executionContext,
- s"processor-$nr",
- tag).runQueryStream(killSwitch, histogram))
+ tags.foreach(tag =>
+ new EventProcessorStream[ConfigurablePersistentActor.Event](
+ ctx.system,
+ ctx.executionContext,
+ s"processor-$nr",
+ tag).runQueryStream(killSwitch, histogram))
Behaviors
.receiveMessage[Command] {
diff --git a/project/Common.scala b/project/Common.scala
index 4566f7d..83e9176 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -22,14 +22,14 @@ object Common extends AutoPlugin {
homepage := Some(url("https://akka.io")),
// apiURL defined in projectSettings because version.value is not correct here
scmInfo := Some(
- ScmInfo(
- url("https://github.com/akka/akka-persistence-cassandra"),
- "git@github.com:akka/akka-persistence-cassandra.git")),
+ ScmInfo(
+ url("https://github.com/akka/akka-persistence-cassandra"),
+ "git@github.com:akka/akka-persistence-cassandra.git")),
developers += Developer(
- "contributors",
- "Contributors",
- "https://gitter.im/akka/dev",
- url("https://github.com/akka/akka-persistence-cassandra/graphs/contributors")),
+ "contributors",
+ "Contributors",
+ "https://gitter.im/akka/dev",
+ url("https://github.com/akka/akka-persistence-cassandra/graphs/contributors")),
licenses := Seq(("Apache-2.0", url("https://www.apache.org/licenses/LICENSE-2.0"))),
description := "A Cassandra plugin for Akka Persistence.")
@@ -41,27 +41,27 @@ object Common extends AutoPlugin {
scalacOptions ++= Seq("-encoding", "UTF-8", "-feature", "-unchecked", "-Xlint", "-Ywarn-dead-code", "-deprecation"),
Compile / console / scalacOptions --= Seq("-deprecation", "-Xfatal-warnings", "-Xlint", "-Ywarn-unused:imports"),
Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
- "-doc-title",
- "Akka Persistence Cassandra",
- "-doc-version",
- version.value,
- "-sourcepath",
- (ThisBuild / baseDirectory).value.toString,
- "-doc-source-url", {
- val branch = if (isSnapshot.value) "master" else s"v${version.value}"
- s"https://github.com/akka/akka-persistence-cassandra/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
- },
- "-doc-canonical-base-url",
- "https://doc.akka.io/api/akka-persistence-cassandra/current/",
- "-skip-packages",
- "akka.pattern" // for some reason Scaladoc creates this
- ),
+ "-doc-title",
+ "Akka Persistence Cassandra",
+ "-doc-version",
+ version.value,
+ "-sourcepath",
+ (ThisBuild / baseDirectory).value.toString,
+ "-doc-source-url", {
+ val branch = if (isSnapshot.value) "master" else s"v${version.value}"
+ s"https://github.com/akka/akka-persistence-cassandra/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
+ },
+ "-doc-canonical-base-url",
+ "https://doc.akka.io/api/akka-persistence-cassandra/current/",
+ "-skip-packages",
+ "akka.pattern" // for some reason Scaladoc creates this
+ ),
Compile / doc / scalacOptions --= Seq("-Xfatal-warnings"),
scalafmtOnCompile := true,
autoAPIMappings := true,
apiURL := Some(url(s"https://doc.akka.io/api/akka-persistence-cassandra/${projectInfoVersion.value}")),
headerLicense := Some(
- HeaderLicense.Custom("""Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>""")),
+ HeaderLicense.Custom("""Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>""")),
sonatypeProfileName := "com.typesafe",
Test / logBuffered := System.getProperty("akka.logBufferedTests", "false").toBoolean,
// show full stack traces and test case durations
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 6e26982..da1e3a4 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -36,15 +36,15 @@ object Dependencies {
"com.typesafe.akka" %% "akka-cluster-sharding")
val akkaPersistenceCassandraDependencies = Seq(
- "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % AlpakkaVersion,
- "com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
- "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
- "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
- "org.scala-lang.modules" %% "scala-collection-compat" % "2.4.4",
- Logback % Test,
- "org.scalatest" %% "scalatest" % "3.2.11" % Test,
- "org.pegdown" % "pegdown" % "1.6.0" % Test,
- "org.osgi" % "org.osgi.core" % "5.0.0" % Provided) ++ akkaTestDeps.map(_ % AkkaVersion % Test)
+ "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % AlpakkaVersion,
+ "com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
+ "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
+ "org.scala-lang.modules" %% "scala-collection-compat" % "2.4.4",
+ Logback % Test,
+ "org.scalatest" %% "scalatest" % "3.2.11" % Test,
+ "org.pegdown" % "pegdown" % "1.6.0" % Test,
+ "org.osgi" % "org.osgi.core" % "5.0.0" % Provided) ++ akkaTestDeps.map(_ % AkkaVersion % Test)
val exampleDependencies = Seq(
Logback,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org