You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/11/15 19:38:19 UTC

[incubator-pekko-persistence-cassandra] 03/07: format source with scalafmt, #2

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit 9b6b9f1850463f8061545cccc869b848154344dd
Author: Auto Format <nobody>
AuthorDate: Sat Nov 12 07:40:57 2022 +0100

    format source with scalafmt, #2
---
 build.sbt                                          |  76 ++---
 .../cassandra/testkit/CassandraLauncher.scala      |  23 +-
 .../cassandra/EventsByTagMigration.scala           |   1 -
 .../cassandra/KeyspaceAndTableStatements.scala     |   1 -
 .../persistence/cassandra/cleanup/Cleanup.scala    |   1 -
 .../compaction/BaseCompactionStrategy.scala        |   3 +-
 .../compaction/LeveledCompactionStrategy.scala     |   3 +-
 .../compaction/SizeTieredCompactionStrategy.scala  |   2 +-
 .../persistence/cassandra/journal/Buffer.scala     |   4 +-
 .../cassandra/journal/CassandraJournal.scala       |  17 +-
 .../journal/CassandraJournalStatements.scala       |   4 +-
 .../cassandra/journal/CassandraTagRecovery.scala   |  17 +-
 .../cassandra/journal/PubSubThrottler.scala        |   2 +-
 .../persistence/cassandra/journal/TagWriter.scala  |   2 +-
 .../persistence/cassandra/journal/TagWriters.scala |   7 +-
 .../query/EventsByPersistenceIdStage.scala         |   1 -
 .../cassandra/query/EventsByTagStage.scala         |  25 +-
 .../query/TagViewSequenceNumberScanner.scala       |   4 +-
 .../query/javadsl/CassandraReadJournal.scala       |   2 -
 .../akka/persistence/cassandra/query/package.scala |   8 +-
 .../query/scaladsl/CassandraReadJournal.scala      |   5 +-
 .../snapshot/CassandraSnapshotStore.scala          |  12 +-
 .../cassandra/MultiNodeClusterSpec.scala           |  87 +++--
 .../cassandra/CassandraEventsByTagLoadSpec.scala   |  21 +-
 .../akka/persistence/cassandra/CassandraSpec.scala |   6 +-
 .../cassandra/EventsByTagMigrationSpec.scala       |  32 +-
 .../akka/persistence/cassandra/RetriesSpec.scala   |   6 +-
 .../cassandra/cleanup/CleanupSpec.scala            |  22 +-
 .../CassandraCompactionStrategySpec.scala          |   8 +-
 .../cassandra/query/EventAdaptersReadSpec.scala    |  45 +--
 .../cassandra/query/EventsByTagSpec.scala          | 365 +++++++++++----------
 .../cassandra/testkit/CassandraLauncherSpec.scala  |   4 +-
 .../AllPersistenceIdsMigrationCompileOnly.scala    |   4 +-
 .../doc/reconciler/ReconciliationCompileOnly.scala |   4 +-
 .../test/scala/doc/cleanup/CleanupDocExample.scala |   4 +-
 .../cassandra/example/EventProcessorStream.scala   |  34 +-
 .../akka/persistence/cassandra/example/Main.scala  |  68 ++--
 .../persistence/cassandra/example/ReadSide.scala   |  15 +-
 project/Common.scala                               |  46 +--
 project/Dependencies.scala                         |  18 +-
 40 files changed, 513 insertions(+), 496 deletions(-)

diff --git a/build.sbt b/build.sbt
index 1e39948..512dd39 100644
--- a/build.sbt
+++ b/build.sbt
@@ -27,7 +27,7 @@ lazy val core = project
     name := "akka-persistence-cassandra",
     libraryDependencies ++= Dependencies.akkaPersistenceCassandraDependencies,
     Compile / packageBin / packageOptions += Package.ManifestAttributes(
-        "Automatic-Module-Name" -> "akka.persistence.cassandra"))
+      "Automatic-Module-Name" -> "akka.persistence.cassandra"))
   .configs(MultiJvm)
 
 lazy val cassandraLauncher = project
@@ -48,7 +48,7 @@ lazy val cassandraBundle = project
     crossPaths := false,
     autoScalaLibrary := false,
     libraryDependencies += ("org.apache.cassandra" % "cassandra-all" % "3.11.3")
-        .exclude("commons-logging", "commons-logging"),
+      .exclude("commons-logging", "commons-logging"),
     dependencyOverrides += "com.github.jbellis" % "jamm" % "0.3.3", // See jamm comment in https://issues.apache.org/jira/browse/CASSANDRA-9608
     assembly / target := target.value / "bundle" / "akka" / "persistence" / "cassandra" / "launcher",
     assembly / assemblyJarName := "cassandra-bundle.jar")
@@ -69,19 +69,19 @@ lazy val endToEndExample = project
     dockerUsername := Some("kubakka"),
     dockerUpdateLatest := true,
     // update if deploying to some where that can't see docker hu
-    //dockerRepository := Some("some-registry"),
+    // dockerRepository := Some("some-registry"),
     dockerCommands ++= Seq(
-        Cmd("USER", "root"),
-        Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", "busybox-extras", "curl", "iptables"),
-        Cmd(
-          "RUN",
-          "/sbin/apk",
-          "add",
-          "--no-cache",
-          "jattach",
-          "--repository",
-          "http://dl-cdn.alpinelinux.org/alpine/edge/community/"),
-        Cmd("RUN", "chgrp -R 0 . && chmod -R g=u .")),
+      Cmd("USER", "root"),
+      Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", "busybox-extras", "curl", "iptables"),
+      Cmd(
+        "RUN",
+        "/sbin/apk",
+        "add",
+        "--no-cache",
+        "jattach",
+        "--repository",
+        "http://dl-cdn.alpinelinux.org/alpine/edge/community/"),
+      Cmd("RUN", "chgrp -R 0 . && chmod -R g=u .")),
     // Docker image is only for running in k8s
     Universal / javaOptions ++= Seq("-J-Dconfig.resource=kubernetes.conf"))
   .enablePlugins(DockerPlugin, JavaAppPackaging)
@@ -104,30 +104,30 @@ lazy val docs = project
     Preprocess / sourceDirectory := (LocalRootProject / ScalaUnidoc / unidoc / target).value,
     Paradox / siteSubdirName := s"docs/akka-persistence-cassandra/${projectInfoVersion.value}",
     Compile / paradoxProperties ++= Map(
-        "project.url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current/",
-        "canonical.base_url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current",
-        "akka.version" -> Dependencies.AkkaVersion,
-        // Akka
-        "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
-        "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
-        "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
-        // Alpakka
-        "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
-        "scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
-        "javadoc.akka.stream.alpakka.base_url" -> "",
-        // APC 0.x
-        "extref.apc-0.x.base_url" -> s"https://doc.akka.io/docs/akka-persistence-cassandra/0.103/%s",
-        // Cassandra
-        "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s",
-        // Datastax Java driver
-        "extref.java-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.DriverVersionInDocs}/%s",
-        "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.DriverVersionInDocs}/",
-        // Java
-        "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
-        // Scala
-        "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
-        "scaladoc.akka.persistence.cassandra.base_url" -> s"/${(Preprocess / siteSubdirName).value}/",
-        "javadoc.akka.persistence.cassandra.base_url" -> ""), // no Javadoc is published
+      "project.url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current/",
+      "canonical.base_url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current",
+      "akka.version" -> Dependencies.AkkaVersion,
+      // Akka
+      "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
+      "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
+      "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
+      // Alpakka
+      "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
+      "scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
+      "javadoc.akka.stream.alpakka.base_url" -> "",
+      // APC 0.x
+      "extref.apc-0.x.base_url" -> s"https://doc.akka.io/docs/akka-persistence-cassandra/0.103/%s",
+      // Cassandra
+      "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s",
+      // Datastax Java driver
+      "extref.java-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.DriverVersionInDocs}/%s",
+      "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.DriverVersionInDocs}/",
+      // Java
+      "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
+      // Scala
+      "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
+      "scaladoc.akka.persistence.cassandra.base_url" -> s"/${(Preprocess / siteSubdirName).value}/",
+      "javadoc.akka.persistence.cassandra.base_url" -> ""), // no Javadoc is published
     paradoxGroups := Map("Language" -> Seq("Java", "Scala")),
     ApidocPlugin.autoImport.apidocRootPackage := "akka",
     resolvers += Resolver.jcenterRepo,
diff --git a/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala b/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
index 5e4fc35..ff82c8c 100644
--- a/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
+++ b/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
@@ -260,7 +260,7 @@ object CassandraLauncher {
 
       println(
         s"Starting Cassandra on port client port: $realPort storage port $storagePort host $realHost java version ${System
-          .getProperty("java.runtime.version")}")
+            .getProperty("java.runtime.version")}")
 
       // http://wiki.apache.org/cassandra/StorageConfiguration
       val conf = readResource(configResource)
@@ -400,16 +400,17 @@ object CassandraLauncher {
     val deadline = AwaitListenTimeout.fromNow
     @annotation.tailrec
     def tryConnect(): Unit = {
-      val retry = try {
-        new Socket(host, port).close()
-        false
-      } catch {
-        case _: IOException if deadline.hasTimeLeft() =>
-          Thread.sleep(AwaitListenPoll.toMillis)
-          true
-        case ioe: IOException =>
-          throw new RuntimeException(s"Cassandra did not start within $AwaitListenTimeout", ioe)
-      }
+      val retry =
+        try {
+          new Socket(host, port).close()
+          false
+        } catch {
+          case _: IOException if deadline.hasTimeLeft() =>
+            Thread.sleep(AwaitListenPoll.toMillis)
+            true
+          case ioe: IOException =>
+            throw new RuntimeException(s"Cassandra did not start within $AwaitListenTimeout", ioe)
+        }
       if (retry) tryConnect()
     }
     tryConnect()
diff --git a/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala b/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
index b063d05..390bc3a 100644
--- a/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
@@ -68,7 +68,6 @@ object EventsByTagMigration {
 }
 
 /**
- *
  * @param pluginConfigPath The config namespace where the plugin is configured, default is `akka.persistence.cassandra`
  */
 class EventsByTagMigration(
diff --git a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
index e1394d9..9586da3 100644
--- a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
@@ -31,7 +31,6 @@ class KeyspaceAndTableStatements(
    *
    * This can be queried in for example a startup script without accessing the actual
    * Cassandra plugin actor.
-   *
    */
   def createJournalTablesStatements: immutable.Seq[String] =
     journalStatements.createTable ::
diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
index aeb2618..b61cb09 100644
--- a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
@@ -131,7 +131,6 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
    * snapshot.
    *
    * @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots
-   *
    */
   def deleteBeforeSnapshot(
       persistenceId: String,
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
index 71841bb..7f2239a 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
@@ -16,7 +16,8 @@ abstract class BaseCompactionStrategy(config: Config, className: String, propert
   require(config.hasPath("class") && config.getString("class") == className, s"Config does not specify a $className")
   require(
     config.entrySet().asScala.map(_.getKey).forall(propertyKeys.contains(_)),
-    s"Config contains properties not supported by a $className. Supported: $propertyKeys. Supplied: ${config.entrySet().asScala.map(_.getKey)}")
+    s"Config contains properties not supported by a $className. Supported: $propertyKeys. Supplied: ${config.entrySet().asScala.map(
+        _.getKey)}")
 
   val enabled: Boolean =
     if (config.hasPath("enabled")) config.getBoolean("enabled") else true
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
index a4be050..db7c05f 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
@@ -10,7 +10,8 @@ import com.typesafe.config.Config
  * https://github.com/apache/cassandra/blob/cassandra-2.2/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
  */
 class LeveledCompactionStrategy(config: Config)
-    extends BaseCompactionStrategy(config, LeveledCompactionStrategy.ClassName, LeveledCompactionStrategy.propertyKeys) {
+    extends BaseCompactionStrategy(config, LeveledCompactionStrategy.ClassName,
+      LeveledCompactionStrategy.propertyKeys) {
   val ssTableSizeInMB: Long =
     if (config.hasPath("sstable_size_in_mb"))
       config.getLong("sstable_size_in_mb")
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
index fb02ff1..d6108de 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
@@ -51,7 +51,7 @@ object SizeTieredCompactionStrategy extends CassandraCompactionStrategyConfig[Si
 
   override def propertyKeys: List[String] =
     (BaseCompactionStrategy.propertyKeys ++
-    List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted
+      List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted
 
   override def fromConfig(config: Config): SizeTieredCompactionStrategy =
     new SizeTieredCompactionStrategy(config)
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
index 0e8d2c8..2c2aa2b 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
@@ -65,8 +65,8 @@ private[akka] case class Buffer(
         // add them to pending, any time bucket changes will be detected later
         copy(size = newSize, pending = pending :+ write)
       } else if (nextBatch.headOption.exists(oldestEvent =>
-                   UUIDComparator.comparator
-                     .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
+          UUIDComparator.comparator
+            .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
         // rare case where events have been received out of order, just re-build the buffer
         require(pending.isEmpty)
         val allWrites = (nextBatch :+ write).sortBy(_.events.head._1.timeUuid)(timeUuidOrdering)
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
index 9f6de3b..cb601de 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
@@ -6,7 +6,7 @@ package akka.persistence.cassandra.journal
 
 import java.lang.{ Long => JLong }
 import java.nio.ByteBuffer
-import java.util.{ UUID, HashMap => JHMap, Map => JMap }
+import java.util.{ HashMap => JHMap, Map => JMap, UUID }
 
 import akka.Done
 import akka.actor.SupervisorStrategy.Stop
@@ -266,7 +266,7 @@ import akka.stream.scaladsl.Source
             writeMessages(serialized)
           } else {
 
-            //if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
+            // if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
             val groups: List[List[SerializedAtomicWrite]] = groupedWrites(serialized.toList.reverse, Nil, Nil)
 
             // execute the groups in sequence
@@ -301,7 +301,7 @@ import akka.stream.scaladsl.Source
     toReturn
   }
 
-  //Regroup batches by payload size
+  // Regroup batches by payload size
   @tailrec
   private def groupedWrites(
       reversed: List[SerializedAtomicWrite],
@@ -639,12 +639,11 @@ import akka.stream.scaladsl.Source
     val boundSelectHighestSequenceNr = preparedSelectHighestSequenceNr.map(_.bind(persistenceId, partitionNr: JLong))
     boundSelectHighestSequenceNr
       .flatMap(selectOne)
-      .map(
-        row =>
-          row
-            .map(s =>
-              PartitionInfo(partitionNr, minSequenceNr(partitionNr), math.min(s.getLong("sequence_nr"), maxSequenceNr)))
-            .getOrElse(PartitionInfo(partitionNr, minSequenceNr(partitionNr), -1)))
+      .map(row =>
+        row
+          .map(s =>
+            PartitionInfo(partitionNr, minSequenceNr(partitionNr), math.min(s.getLong("sequence_nr"), maxSequenceNr)))
+          .getOrElse(PartitionInfo(partitionNr, minSequenceNr(partitionNr), -1)))
   }
 
   private def asyncHighestDeletedSequenceNumber(persistenceId: String): Future[Long] = {
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
index dc693c9..06e3604 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
@@ -76,8 +76,8 @@ import akka.persistence.cassandra.FutureDone
       |  WITH gc_grace_seconds =${eventsByTagSettings.tagTable.gcGraceSeconds}
       |  AND compaction = ${indent(eventsByTagSettings.tagTable.compactionStrategy.asCQL, "    ")}
       |  ${if (eventsByTagSettings.tagTable.ttl.isDefined)
-         "AND default_time_to_live = " + eventsByTagSettings.tagTable.ttl.get.toSeconds
-       else ""}
+        "AND default_time_to_live = " + eventsByTagSettings.tagTable.ttl.get.toSeconds
+      else ""}
     """.stripMargin.trim
 
   def createTagsProgressTable: String =
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
index a444f64..6ea4c40 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
@@ -85,15 +85,14 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
     else {
       val completed: List[Future[Done]] =
         tpr.tags.toList
-          .map(
-            tag =>
-              tag -> serializeEvent(
-                tpr.pr,
-                tpr.tags,
-                tpr.offset,
-                settings.eventsByTagSettings.bucketSize,
-                serialization,
-                system))
+          .map(tag =>
+            tag -> serializeEvent(
+              tpr.pr,
+              tpr.tags,
+              tpr.offset,
+              settings.eventsByTagSettings.bucketSize,
+              serialization,
+              system))
           .map {
             case (tag, serializedFut) =>
               serializedFut.map { serialized =>
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala b/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
index 0cc9a39..a947ade 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
@@ -27,7 +27,7 @@ import akka.annotation.InternalApi
   def receive = {
     case Tick =>
       for ((msg, clients) <- repeated;
-           client <- clients) {
+        client <- clients) {
         delegate.tell(msg, client)
       }
       seen.clear()
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
index c74630a..93b81f2 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
@@ -344,7 +344,7 @@ import scala.util.{ Failure, Success, Try }
               throw new IllegalStateException(
                 s"Expected events to be ordered by seqNr. ${event.persistenceId} " +
                 s"Events: ${writes.nextBatch.map(e =>
-                  (e.events.head._1.persistenceId, e.events.head._1.sequenceNr, e.events.head._1.timeUuid))}")
+                    (e.events.head._1.persistenceId, e.events.head._1.sequenceNr, e.events.head._1.timeUuid))}")
 
             acc + (event.persistenceId -> PidProgress(from, event.sequenceNr, tagPidSequenceNr, event.timeUuid))
           case None =>
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
index f5b1c14..4374cd9 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
@@ -106,10 +106,9 @@ import scala.util.Try
     def writeProgress(tag: Tag, persistenceId: String, seqNr: Long, tagPidSequenceNr: Long, offset: UUID)(
         implicit ec: ExecutionContext): Future[Done] = {
       WriteTagProgress
-        .map(
-          ps =>
-            ps.bind(persistenceId, tag, seqNr: JLong, tagPidSequenceNr: JLong, offset)
-              .setExecutionProfileName(writeProfile))
+        .map(ps =>
+          ps.bind(persistenceId, tag, seqNr: JLong, tagPidSequenceNr: JLong, offset)
+            .setExecutionProfileName(writeProfile))
         .flatMap(executeWrite)
     }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index edcfe8e..f8d1471 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -452,7 +452,6 @@ import akka.persistence.cassandra.PluginSettings
             }
 
           case QueryIdle | _: QueryInProgress | _: QueryResult => // ok
-
         }
       }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
index ddc3882..ecbc797 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
@@ -99,10 +99,10 @@ import scala.compat.java8.FutureConverters._
         implicit ec: ExecutionContext,
         scheduler: Scheduler): Future[AsyncResultSet] = {
       Retries.retry({ () =>
-        val bound =
-          statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
-        session.executeAsync(bound).toScala
-      }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
+          val bound =
+            statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
+          session.executeAsync(bound).toScala
+        }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
     }
   }
 
@@ -121,7 +121,6 @@ import scala.compat.java8.FutureConverters._
   final case class MissingData(maxOffset: UUID, maxSequenceNr: TagPidSequenceNr)
 
   /**
-   *
    * @param queryPrevious Should the previous bucket be queries right away. Searches go back one bucket, first the current bucket then
    *                      the previous bucket. This is repeated every refresh interval.
    * @param gapDetected Whether an explicit gap has been detected e.g. events 1-4 have been seen then the next event is not 5.
@@ -190,7 +189,8 @@ import scala.compat.java8.FutureConverters._
 
     // override to give nice offset formatting
     override def toString: String =
-      s"""StageState(state: $state, fromOffset: ${formatOffset(fromOffset)}, toOffset: ${formatOffset(toOffset)}, tagPidSequenceNrs: $tagPidSequenceNrs, missingLookup: $missingLookup, bucketSize: $bucketSize)"""
+      s"""StageState(state: $state, fromOffset: ${formatOffset(fromOffset)}, toOffset: ${formatOffset(
+          toOffset)}, tagPidSequenceNrs: $tagPidSequenceNrs, missingLookup: $missingLookup, bucketSize: $bucketSize)"""
   }
 
   private val uuidRowOrdering = new Ordering[UUIDRow] {
@@ -377,9 +377,10 @@ import scala.compat.java8.FutureConverters._
       }
 
       override def preStart(): Unit = {
-        stageState = StageState(QueryIdle, initialQueryOffset, calculateToOffset(), initialTagPidSequenceNrs.transform {
-          case (_, (tagPidSequenceNr, offset)) => (tagPidSequenceNr, offset, System.currentTimeMillis())
-        }, delayedScanInProgress = false, System.currentTimeMillis(), None, bucketSize)
+        stageState = StageState(QueryIdle, initialQueryOffset, calculateToOffset(),
+          initialTagPidSequenceNrs.transform {
+            case (_, (tagPidSequenceNr, offset)) => (tagPidSequenceNr, offset, System.currentTimeMillis())
+          }, delayedScanInProgress = false, System.currentTimeMillis(), None, bucketSize)
         if (log.isInfoEnabled) {
           log.info(
             s"[{}]: EventsByTag query [${session.tag}] starting with EC delay {}ms: fromOffset [{}] toOffset [{}]",
@@ -525,7 +526,8 @@ import scala.compat.java8.FutureConverters._
           .selectEventsForBucket(
             stageState.currentTimeBucket,
             stageState.fromOffset,
-            stageState.toOffset, { (attempt, t, nextRetry) =>
+            stageState.toOffset,
+            { (attempt, t, nextRetry) =>
               if (log.isWarningEnabled) {
                 log.warning(
                   s"[{}] Query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ${attempt}. Next retry in: ${nextRetry.pretty}. Reason: ${t.getMessage}",
@@ -566,7 +568,8 @@ import scala.compat.java8.FutureConverters._
             .selectEventsForBucket(
               missing.bucket,
               missing.minOffset,
-              missing.maxOffset, { (attempt, t, nextRetry) =>
+              missing.maxOffset,
+              { (attempt, t, nextRetry) =>
                 if (log.isWarningEnabled) {
                   log.warning(
                     s"[{}] Looking for missing query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ${attempt}. Next retry in: ${nextRetry.pretty}. Reason: ${t.getMessage}",
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
index 8102598..60b34f1 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
@@ -67,8 +67,8 @@ import akka.stream.scaladsl.Sink
       toOffset: UUID,
       bucketSize: BucketSize,
       scanningPeriod: FiniteDuration,
-      whichToKeep: (TagPidSequenceNr, TagPidSequenceNr) => TagPidSequenceNr)
-      : Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
+      whichToKeep: (TagPidSequenceNr,
+          TagPidSequenceNr) => TagPidSequenceNr): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
 
     def doIt(): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
index 8b90198..216f47e 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
@@ -44,7 +44,6 @@ object CassandraReadJournal {
  * Configuration settings can be defined in the configuration section with the
  * absolute path corresponding to the identifier, which is `"akka.persistence.cassandra.query"`
  * for the default [[CassandraReadJournal#Identifier]]. See `reference.conf`.
- *
  */
 class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query.scaladsl.CassandraReadJournal)
     extends ReadJournal
@@ -154,7 +153,6 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
    *
    * Use `NoOffset` when you want all events from the beginning of time.
    * To acquire an offset from a long unix timestamp to use with this query, you can use [[timeBasedUUIDFrom]].
-   *
    */
   override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
     scaladslReadJournal.currentEventsByTag(tag, offset).asJava
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/package.scala b/core/src/main/scala/akka/persistence/cassandra/query/package.scala
index e143ffb..0c6ec42 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/package.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/package.scala
@@ -35,10 +35,10 @@ package object query {
       val timestamp = (time - uuidEpoch) * 10000
 
       var msb = 0L
-      msb |= (0X00000000FFFFFFFFL & timestamp) << 32
-      msb |= (0X0000FFFF00000000L & timestamp) >>> 16
-      msb |= (0X0FFF000000000000L & timestamp) >>> 48
-      msb |= 0X0000000000001000L // sets the version to 1.
+      msb |= (0x00000000FFFFFFFFL & timestamp) << 32
+      msb |= (0x0000FFFF00000000L & timestamp) >>> 16
+      msb |= (0x0FFF000000000000L & timestamp) >>> 48
+      msb |= 0x0000000000001000L // sets the version to 1.
       msb
     }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
index 7045cb8..e4ed50d 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
@@ -479,7 +479,6 @@ class CassandraReadJournal protected (
    *
    * Use `NoOffset` when you want all events from the beginning of time.
    * To acquire an offset from a long unix timestamp to use with this query, you can use [[timeBasedUUIDFrom]].
-   *
    */
   override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
     currentEventsByTagInternal(tag, offset)
@@ -654,8 +653,8 @@ class CassandraReadJournal protected (
             fastForwardEnabled))
         .named(name)
     }.mapAsync(querySettings.deserializationParallelism) { row =>
-        extractor.extract(row, deserializeEventAsync)
-      }
+      extractor.extract(row, deserializeEventAsync)
+    }
       .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
   }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index bf61c26..46b71aa 100644
--- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -193,20 +193,22 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
       }
     }
 
-  /** Plugin API: deletes all snapshots matching `criteria`. This call is protected with a circuit-breaker.
+  /**
+   * Plugin API: deletes all snapshots matching `criteria`. This call is protected with a circuit-breaker.
    *
    * @param persistenceId id of the persistent actor.
    * @param criteria selection criteria for deleting. If no timestamp constraints are specified this routine
    * @note Due to the limitations of Cassandra deletion requests, this routine makes an initial query in order to obtain the
    * records matching the criteria which are then deleted in a batch deletion. Improvements in Cassandra v3.0+ mean a single
    * range deletion on the sequence number is used instead, except if timestamp constraints are specified, which still
-   * requires the original two step routine.*/
+   * requires the original two step routine.
+   */
   override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
     session.serverMetaData.flatMap { meta =>
       if (meta.isVersion2
-          || settings.cosmosDb
-          || 0L < criteria.minTimestamp
-          || criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
+        || settings.cosmosDb
+        || 0L < criteria.minTimestamp
+        || criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
         preparedSelectSnapshotMetadata.flatMap { snapshotMetaPs =>
           // this meta query gets slower than slower if snapshots are deleted without a criteria.minSequenceNr as
           // all previous tombstones are scanned in the meta data query
diff --git a/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala b/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
index 6a162fd..ee800c6 100644
--- a/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
+++ b/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
@@ -3,16 +3,16 @@ package akka.cluster.persistence.cassandra
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import akka.actor.{Actor, ActorRef, ActorSystem, Address, RootActorPath}
-import akka.cluster.{Cluster, ClusterReadView, MemberStatus, _}
+import akka.actor.{ Actor, ActorRef, ActorSystem, Address, RootActorPath }
+import akka.cluster.{ Cluster, ClusterReadView, MemberStatus, _ }
 import akka.event.Logging.ErrorLevel
 import akka.remote.testconductor.RoleName
-import akka.remote.testkit.{FlightRecordingSupport, MultiNodeSpec}
+import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec }
 import akka.testkit.TestEvent._
 import akka.testkit._
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.{ Config, ConfigFactory }
 import org.scalatest.exceptions.TestCanceledException
-import org.scalatest.{Canceled, Outcome, Suite}
+import org.scalatest.{ Canceled, Outcome, Suite }
 
 import scala.collection.immutable
 import scala.concurrent.duration._
@@ -21,8 +21,9 @@ import scala.language.implicitConversions
 object MultiNodeClusterSpec {
 
   def clusterConfigWithFailureDetectorPuppet: Config =
-    ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").
-      withFallback(clusterConfig)
+    ConfigFactory.parseString(
+      "akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").withFallback(
+      clusterConfig)
 
   def clusterConfig(failureDetectorPuppet: Boolean): Config =
     if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
@@ -72,20 +73,20 @@ object MultiNodeClusterSpec {
   class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
     import EndActor._
     def receive: Receive = {
-      case SendEnd ⇒
-        target foreach { t ⇒
+      case SendEnd =>
+        target.foreach { t =>
           context.actorSelection(RootActorPath(t) / self.path.elements) ! End
         }
-      case End ⇒
-        testActor forward End
+      case End =>
+        testActor.forward(End)
         sender() ! EndAck
-      case EndAck ⇒
-        testActor forward EndAck
+      case EndAck =>
+        testActor.forward(EndAck)
     }
   }
 }
 
-trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordingSupport { self: MultiNodeSpec ⇒
+trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordingSupport { self: MultiNodeSpec =>
 
   override def initialParticipants = roles.size
 
@@ -111,9 +112,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
         ".*Cluster Node.* - is starting up.*",
         ".*Shutting down cluster Node.*",
         ".*Cluster node successfully shut down.*",
-        ".*Using a dedicated scheduler for cluster.*") foreach { s ⇒
-          sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
-        }
+        ".*Using a dedicated scheduler for cluster.*").foreach { s =>
+        sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
+      }
 
       muteDeadLetters(
         classOf[akka.actor.PoisonPill],
@@ -154,11 +155,11 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   implicit def address(role: RoleName): Address = {
     cachedAddresses.get(role) match {
-      case null ⇒
+      case null =>
         val address = node(role).address
         cachedAddresses.put(role, address)
         address
-      case address ⇒ address
+      case address => address
     }
   }
 
@@ -188,7 +189,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   def startClusterNode(): Unit = {
     if (clusterView.members.isEmpty) {
-      cluster join myself
+      cluster.join(myself)
       awaitAssert(clusterView.members.map(_.address) should contain(address(myself)))
     } else
       clusterView.self
@@ -221,19 +222,20 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   def joinWithin(joinNode: RoleName, max: Duration = remainingOrDefault, interval: Duration = 1.second): Unit = {
     def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
-      clusterView.members.exists { m ⇒ (m.address == member) && status.contains(m.status) }
+      clusterView.members.exists { m => (m.address == member) && status.contains(m.status) }
 
     cluster.join(joinNode)
-    awaitCond({
-      clusterView.refreshCurrentState()
-      if (memberInState(joinNode, List(MemberStatus.up)) &&
-        memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
-        true
-      else {
-        cluster.join(joinNode)
-        false
-      }
-    }, max, interval)
+    awaitCond(
+      {
+        clusterView.refreshCurrentState()
+        if (memberInState(joinNode, List(MemberStatus.up)) &&
+          memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
+          true
+        else {
+          cluster.join(joinNode)
+          false
+        }
+      }, max, interval)
   }
 
   /**
@@ -243,7 +245,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
   def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
     val members = gotMembers.toIndexedSeq
     members.size should ===(expectedAddresses.length)
-    expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address should ===(a) }
+    expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) => members(i).address should ===(a) }
   }
 
   /**
@@ -269,14 +271,14 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
     if (nodesInCluster.contains(myself)) {
-      nodesInCluster.length should not be (0)
+      nodesInCluster.length should not be 0
       val expectedLeader = roleOfLeader(nodesInCluster)
       val leader = clusterView.leader
       val isLeader = leader == Some(clusterView.selfAddress)
       assert(
         isLeader == isNode(expectedLeader),
         "expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
-      clusterView.status should (be(MemberStatus.Up) or be(MemberStatus.Leaving))
+      clusterView.status should (be(MemberStatus.Up).or(be(MemberStatus.Leaving)))
     }
 
   /**
@@ -284,17 +286,17 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    * Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
    */
   def awaitMembersUp(
-    numberOfMembers:          Int,
-    canNotBePartOfMemberRing: Set[Address]   = Set.empty,
-    timeout:                  FiniteDuration = 25.seconds): Unit = {
+      numberOfMembers: Int,
+      canNotBePartOfMemberRing: Set[Address] = Set.empty,
+      timeout: FiniteDuration = 25.seconds): Unit = {
     within(timeout) {
       if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
-        awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) should not contain (a)))
+        awaitAssert(canNotBePartOfMemberRing.foreach(a => clusterView.members.map(_.address) should not contain a))
       awaitAssert(clusterView.members.size should ===(numberOfMembers))
       awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up)))
       // clusterView.leader is updated by LeaderChanged, await that to be updated also
       val expectedLeader = clusterView.members.collectFirst {
-        case m if m.dataCenter == cluster.settings.SelfDataCenter ⇒ m.address
+        case m if m.dataCenter == cluster.settings.SelfDataCenter => m.address
       }
       awaitAssert(clusterView.leader should ===(expectedLeader))
     }
@@ -307,7 +309,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    * Wait until the specified nodes have seen the same gossip overview.
    */
   def awaitSeenSameState(addresses: Address*): Unit =
-    awaitAssert((addresses.toSet diff clusterView.seenBy) should ===(Set.empty))
+    awaitAssert((addresses.toSet.diff(clusterView.seenBy)) should ===(Set.empty))
 
   /**
    * Leader according to the address ordering of the roles.
@@ -318,7 +320,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    * be determined from the `RoleName`.
    */
   def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
-    nodesInCluster.length should not be (0)
+    nodesInCluster.length should not be 0
     nodesInCluster.sorted.head
   }
 
@@ -332,7 +334,4 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
 
   def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
 
-
 }
-
-
diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
index 0f0fb98..8dcd930 100644
--- a/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
@@ -54,7 +54,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
       val readJournal =
         PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
 
-      eventTags.foreach({ tag =>
+      eventTags.foreach { tag =>
         try {
           validateTagStream(readJournal)(tag)
         } catch {
@@ -66,7 +66,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
             throw new RuntimeException("Only passed the second time")
         }
 
-      })
+      }
     }
   }
 
@@ -78,14 +78,15 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
     probe.request(messagesPerPersistenceId * nrPersistenceIds)
 
     (1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { i: Long =>
-      val event = try {
-        probe.expectNext(veryLongWait)
-      } catch {
-        case e: AssertionError =>
-          system.log.error(e, s"Failed to get event: $i")
-          allReceived.filter(_._2.size != messagesPerPersistenceId).foreach(p => system.log.info("{}", p))
-          throw e
-      }
+      val event =
+        try {
+          probe.expectNext(veryLongWait)
+        } catch {
+          case e: AssertionError =>
+            system.log.error(e, s"Failed to get event: $i")
+            allReceived.filter(_._2.size != messagesPerPersistenceId).foreach(p => system.log.info("{}", p))
+            throw e
+        }
 
       allReceived += (event.persistenceId -> (event.sequenceNr :: allReceived(event.persistenceId)))
       var fail = false
diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
index 341aa18..702ee40 100644
--- a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
@@ -179,8 +179,8 @@ abstract class CassandraSpec(
             .asScala
             .foreach(row => {
               println(s"""Row:${row.getString("tag_name")},${row.getLong("timebucket")},${formatOffset(
-                row.getUuid("timestamp"))},${row.getString("persistence_id")},${row
-                .getLong("tag_pid_sequence_nr")},${row.getLong("sequence_nr")}""")
+                  row.getUuid("timestamp"))},${row.getString("persistence_id")},${row
+                  .getLong("tag_pid_sequence_nr")},${row.getLong("sequence_nr")}""")
 
             })
         }
@@ -190,7 +190,7 @@ abstract class CassandraSpec(
           .asScala
           .foreach(row => {
             println(s"""Row:${row.getLong("partition_nr")}, ${row.getString("persistence_id")}, ${row.getLong(
-              "sequence_nr")}""")
+                "sequence_nr")}""")
           })
 
         println("snapshots")
diff --git a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
index f1c9eb6..23958ba 100644
--- a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
@@ -69,7 +69,7 @@ class EventsByTagMigrationProvidePersistenceIds extends AbstractEventsByTagMigra
     val pidOne = "pOne"
     val pidTwo = "pTwo"
 
-    "support migrating a subset of persistenceIds" taggedAs (RequiresCassandraThree) in {
+    "support migrating a subset of persistenceIds" taggedAs RequiresCassandraThree in {
       writeOldTestEventWithTags(PersistentRepr("e-1", 1, pidOne), Set("blue"))
       writeOldTestEventWithTags(PersistentRepr("e-2", 2, pidOne), Set("blue"))
       writeOldTestEventWithTags(PersistentRepr("f-1", 1, pidTwo), Set("blue"))
@@ -113,7 +113,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
     val pidWithSnapshot = "pidSnapshot"
     val pidExcluded = "pidExcluded"
 
-    "have some existing tagged messages" taggedAs (RequiresCassandraThree) in {
+    "have some existing tagged messages" taggedAs RequiresCassandraThree in {
       // this one uses the 0.7 schema, soo old.
       writeOldTestEventInMessagesColumn(PersistentRepr("e-1", 1L, pidOne), Set("blue", "green", "orange"))
 
@@ -138,7 +138,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       reconciler.rebuildAllPersistenceIds().futureValue
     }
 
-    "allow creation of the new tags view table" taggedAs (RequiresCassandraThree) in {
+    "allow creation of the new tags view table" taggedAs RequiresCassandraThree in {
       migrator.createTables().futureValue shouldEqual Done
     }
 
@@ -146,7 +146,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       migrator.migrateToTagViews(filter = _ != pidExcluded).futureValue shouldEqual Done
     }
 
-    "be idempotent so it can be restarted" taggedAs (RequiresCassandraThree) in {
+    "be idempotent so it can be restarted" taggedAs RequiresCassandraThree in {
       // add some more events to be picked up
       writeOldTestEventWithTags(PersistentRepr("f-1", 1L, pidTwo), Set("green"))
       writeOldTestEventWithTags(PersistentRepr("f-2", 2L, pidTwo), Set("blue"))
@@ -159,11 +159,11 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       reconciler.rebuildAllPersistenceIds().futureValue
     }
 
-    "allow a second migration to resume from where the last one got to" taggedAs (RequiresCassandraThree) in {
+    "allow a second migration to resume from where the last one got to" taggedAs RequiresCassandraThree in {
       migrator.migrateToTagViews(filter = _ != pidExcluded).futureValue shouldEqual Done
     }
 
-    "migrate events missed during the large migration as part of actor recovery" taggedAs (RequiresCassandraThree) in {
+    "migrate events missed during the large migration as part of actor recovery" taggedAs RequiresCassandraThree in {
       // these events mimic the old version still running and persisting events
       writeOldTestEventWithTags(PersistentRepr("f-3", 3L, pidTwo), Set("green"))
       writeOldTestEventWithTags(PersistentRepr("f-4", 4L, pidTwo), Set("blue"))
@@ -172,11 +172,11 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       reconciler.rebuildAllPersistenceIds().futureValue
     }
 
-    "allow adding of the new tags column" taggedAs (RequiresCassandraThree) in {
+    "allow adding of the new tags column" taggedAs RequiresCassandraThree in {
       migrator.addTagsColumn().futureValue shouldEqual Done
     }
 
-    "work with the current implementation" taggedAs (RequiresCassandraThree) in {
+    "work with the current implementation" taggedAs RequiresCassandraThree in {
       val blueSrc: Source[EventEnvelope, NotUsed] = queries.eventsByTag("blue", NoOffset)
       val blueProbe = blueSrc.runWith(TestSink.probe[Any])
       blueProbe.request(5)
@@ -226,7 +226,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       excludedProbe.cancel()
     }
 
-    "see events missed by migration if the persistent actor is started" taggedAs (RequiresCassandraThree) in {
+    "see events missed by migration if the persistent actor is started" taggedAs RequiresCassandraThree in {
       val probe = TestProbe()
       systemTwo.actorOf(TestTaggingActor.props(pidTwo, probe = Some(probe.ref)))
       probe.expectMsg(RecoveryCompleted)
@@ -252,19 +252,19 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
     }
     // This will be left as a manual step for the user as it stops
     // rolling back to the old version
-    "allow dropping of the materialized view" taggedAs (RequiresCassandraThree) in {
+    "allow dropping of the materialized view" taggedAs RequiresCassandraThree in {
       system.log.info("Dropping old materialzied view")
       cluster.execute(SimpleStatement.newInstance(s"DROP MATERIALIZED VIEW $eventsByTagViewName"))
       system.log.info("Dropped old materialzied view")
     }
 
-    "have a peek in the messages table" taggedAs (RequiresCassandraThree) in {
+    "have a peek in the messages table" taggedAs RequiresCassandraThree in {
       val row = cluster.execute(SimpleStatement.newInstance(s"select * from ${messagesTableName} limit 1")).one()
       system.log.debug("New messages table looks like: {}", row)
       system.log.debug("{}", row.getColumnDefinitions)
     }
 
-    "be able to add tags to existing pids" taggedAs (RequiresCassandraThree) in {
+    "be able to add tags to existing pids" taggedAs RequiresCassandraThree in {
       // we need a new actor system for this as the old one will have prepared the statements without
       // the tags column existing
       val pidOnePA = systemTwo.actorOf(TestTaggingActor.props(pidOne, Set("blue", "yellow")))
@@ -283,7 +283,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       blueProbe.expectNextPF {
         case EventEnvelope(_, `pidWithMeta`, 1, "g-1") =>
       }
-      blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 4, "f-4")         => }
+      blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 4, "f-4") => }
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 5, "new-event-1") => }
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 6, "new-event-2") => }
       blueProbe.expectNoMessage(waitTime)
@@ -293,13 +293,13 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
 
     // Again a manual step, leaving them is only wasting disk space
     // the new version will work with these columns still there
-    "allow dropping of tag columns" taggedAs (RequiresCassandraThree) in {
+    "allow dropping of tag columns" taggedAs RequiresCassandraThree in {
       cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag1")
       cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag2")
       cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag3")
     }
 
-    "still work after dropping the tag columns" taggedAs (RequiresCassandraThree) in {
+    "still work after dropping the tag columns" taggedAs RequiresCassandraThree in {
       val pidTwoPA = systemThree.actorOf(TestTaggingActor.props(pidTwo, Set("orange")))
       pidTwoPA ! "new-event-1"
       expectMsg(Ack)
@@ -309,7 +309,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       val orangeSrc: Source[EventEnvelope, NotUsed] = queriesThree.eventsByTag("orange", NoOffset)
       val orangeProbe = orangeSrc.runWith(TestSink.probe[Any])(SystemMaterializer(systemThree).materializer)
       orangeProbe.request(3)
-      orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1")         => }
+      orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 5, "new-event-1") => }
       orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 6, "new-event-2") => }
       orangeProbe.expectNoMessage(waitTime)
diff --git a/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala b/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
index be88d4d..3241db6 100644
--- a/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
@@ -29,9 +29,9 @@ class RetriesSpec
       @volatile var called = 0
       val result = Retries
         .retry(() => {
-          called += 1
-          Future.failed(new RuntimeException(s"cats $called"))
-        }, 3, (_, exc, _) => failProbe.ref ! exc, 1.milli, 2.millis, 0.1)
+            called += 1
+            Future.failed(new RuntimeException(s"cats $called"))
+          }, 3, (_, exc, _) => failProbe.ref ! exc, 1.milli, 2.millis, 0.1)
         .failed
         .futureValue
       called shouldEqual 3
diff --git a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
index 40d85b8..7cc691e 100644
--- a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
@@ -96,7 +96,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       cleanup.deleteAllEvents(pid, neverUsePersistenceIdAgain = true).futureValue
 
       // also delete from all_persistence_ids
-      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain (pid)
+      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain pid
 
       val p2 = system.actorOf(TestActor.props(pid))
       p2 ! GetRecoveredState
@@ -192,7 +192,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       cleanup.deleteAll(pid, neverUsePersistenceIdAgain = true).futureValue
 
       // also delete from all_persistence_ids
-      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain (pid)
+      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain pid
 
       val p2 = system.actorOf(TestActor.props(pid))
       p2 ! GetRecoveredState
@@ -202,7 +202,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       queries.currentEventsByTag(tag = "tag-b", offset = NoOffset).runWith(Sink.seq).futureValue.size should ===(0)
     }
 
-    "delete some for one persistenceId" taggedAs (RequiresCassandraThree) in {
+    "delete some for one persistenceId" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 8).foreach { i =>
@@ -219,7 +219,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       expectMsg(RecoveredState("", List("evt-6", "evt-7", "evt-8"), 8L))
     }
 
-    "clean up before latest snapshot for one persistence id" taggedAs (RequiresCassandraThree) in {
+    "clean up before latest snapshot for one persistence id" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 3).foreach { i =>
@@ -259,7 +259,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
         .futureValue shouldEqual List("evt-7", "evt-8", "evt-9")
     }
 
-    "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs (RequiresCassandraThree) in {
+    "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 3).foreach { i =>
@@ -299,7 +299,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
         .futureValue shouldEqual List("evt-4", "evt-5", "evt-6", "evt-7", "evt-8", "evt-9")
     }
 
-    "clean up before snapshot including timestamp for one persistence id" taggedAs (RequiresCassandraThree) in {
+    "clean up before snapshot including timestamp for one persistence id" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 3).foreach { i =>
@@ -440,7 +440,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
   }
 
   "Time and snapshot based cleanup" must {
-    "keep the correct  number of snapshots" taggedAs (RequiresCassandraThree) in {
+    "keep the correct  number of snapshots" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -457,7 +457,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
 
       oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000))
     }
-    "keep the all snapshots if fewer than requested without timestamp" taggedAs (RequiresCassandraThree) in {
+    "keep the all snapshots if fewer than requested without timestamp" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -477,7 +477,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
 
       oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 1, 1000))
     }
-    "keep the all snapshots if fewer than requested with timestamp" taggedAs (RequiresCassandraThree) in {
+    "keep the all snapshots if fewer than requested with timestamp" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -512,7 +512,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       oldestSnapshot shouldEqual None
     }
 
-    "don't delete snapshots newer than the oldest date" taggedAs (RequiresCassandraThree) in {
+    "don't delete snapshots newer than the oldest date" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -534,7 +534,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
 
       oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000))
     }
-    "keep snapshots older than the oldest date to meet snapshotsToKeep" taggedAs (RequiresCassandraThree) in {
+    "keep snapshots older than the oldest date to meet snapshotsToKeep" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
diff --git a/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala b/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
index 3b54095..dd728c3 100644
--- a/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
@@ -47,7 +47,7 @@ class CassandraCompactionStrategySpec
       compactionStrategy.compactionWindowUnit shouldEqual TimeUnit.DAYS
     }
 
-    "successfully create CQL from TimeWindowCompactionStrategy" taggedAs (RequiresCassandraThree) in {
+    "successfully create CQL from TimeWindowCompactionStrategy" taggedAs RequiresCassandraThree in {
       val twConfig = ConfigFactory.parseString("""journal.table-compaction-strategy {
           | class = "TimeWindowCompactionStrategy"
           | compaction_window_size = 1
@@ -57,7 +57,7 @@ class CassandraCompactionStrategySpec
 
       val cqlExpression =
         s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable1 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
-          twConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+            twConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
 
       noException must be thrownBy {
         cluster.execute(cqlExpression)
@@ -112,7 +112,7 @@ class CassandraCompactionStrategySpec
 
       val cqlExpression =
         s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable2 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
-          uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+            uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
 
       noException must be thrownBy {
         cluster.execute(cqlExpression)
@@ -165,7 +165,7 @@ class CassandraCompactionStrategySpec
 
       val cqlExpression =
         s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable3 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
-          uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+            uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
 
       noException must be thrownBy {
         cluster.execute(cqlExpression)
diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
index 254452e..1eab4c1 100644
--- a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
@@ -59,10 +59,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
   "Cassandra query EventsByPersistenceId" must {
 
     "not replay dropped events by the event-adapter" in {
-      setup("a", 6, {
-        case x if x % 2 == 0 => "dropped:"
-        case _               => ""
-      })
+      setup("a", 6,
+        {
+          case x if x % 2 == 0 => "dropped:"
+          case _               => ""
+        })
 
       val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
       src
@@ -78,10 +79,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
 
     "replay duplicate events by the event-adapter" in {
 
-      setup("b", 3, {
-        case x if x % 2 == 0 => "duplicated:"
-        case _               => ""
-      })
+      setup("b", 3,
+        {
+          case x if x % 2 == 0 => "duplicated:"
+          case _               => ""
+        })
 
       val src = queries.currentEventsByPersistenceId("b", 0L, Long.MaxValue)
       src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNext("b-1", "b-2", "b-2", "b-3").expectComplete()
@@ -100,10 +102,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
   "Cassandra query EventsByTag" must {
     "not replay events dropped by the event-adapter" in {
 
-      setup("d", 6, tagged("red") {
-        case x if x % 2 == 0 => "dropped:"
-        case _               => ""
-      })
+      setup("d", 6,
+        tagged("red") {
+          case x if x % 2 == 0 => "dropped:"
+          case _               => ""
+        })
 
       val src = queries.eventsByTag("red", NoOffset)
       val sub = src.map(_.event).runWith(TestSink.probe[Any])
@@ -115,10 +118,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
 
     "replay events duplicated by the event-adapter" in {
 
-      setup("e", 3, tagged("yellow") {
-        case x if x % 2 == 0 => "duplicated:"
-        case _               => ""
-      })
+      setup("e", 3,
+        tagged("yellow") {
+          case x if x % 2 == 0 => "duplicated:"
+          case _               => ""
+        })
 
       val src = queries.eventsByTag("yellow", NoOffset)
       val sub = src.map(_.event).runWith(TestSink.probe[Any])
@@ -129,10 +133,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
 
     "replay events transformed by the event-adapter" in {
 
-      setup("e", 3, tagged("green") {
-        case x if x % 2 == 0 => "prefixed:foo:"
-        case _               => ""
-      })
+      setup("e", 3,
+        tagged("green") {
+          case x if x % 2 == 0 => "prefixed:foo:"
+          case _               => ""
+        })
 
       val src = queries.eventsByTag("green", NoOffset)
       val sub = src.map(_.event).runWith(TestSink.probe[Any])
diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
index c37cdc8..92c20ed 100644
--- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
@@ -200,7 +200,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
       val probe = greenSrc.runWith(TestSink.probe[Any])
       probe.request(2)
-      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple")  => e }
+      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
       probe.expectNoMessage(500.millis)
       probe.request(2)
@@ -233,7 +233,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
       val probe = greenSrc.runWith(TestSink.probe[Any])
       probe.request(2)
-      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple")  => e }
+      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
       probe.expectNoMessage(waitTime)
 
@@ -274,7 +274,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       if (appleTimestamp == bananaTimestamp)
         probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
       probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
-      probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf")   => e }
+      probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
       probe2.cancel()
     }
 
@@ -349,22 +349,23 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
 
     "find new events" in {
       val d = system.actorOf(TestActor.props("d"))
-      withProbe(queries.eventsByTag(tag = "black", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(2)
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car") => e }
-        probe.expectNoMessage(waitTime)
+      withProbe(queries.eventsByTag(tag = "black", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(2)
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car") => e }
+          probe.expectNoMessage(waitTime)
 
-        d ! "a black dog"
-        expectMsg(s"a black dog-done")
-        d ! "a black night"
-        expectMsg(s"a black night-done")
+          d ! "a black dog"
+          expectMsg(s"a black dog-done")
+          d ! "a black night"
+          expectMsg(s"a black night-done")
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "d", 1L, "a black dog") => e }
-        probe.expectNoMessage(waitTime)
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "d", 2L, "a black night") => e }
-        probe.cancel()
-      })
+          probe.expectNextPF { case e @ EventEnvelope(_, "d", 1L, "a black dog") => e }
+          probe.expectNoMessage(waitTime)
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "d", 2L, "a black night") => e }
+          probe.cancel()
+        })
     }
 
     "find events from timestamp offset" in {
@@ -394,8 +395,8 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
               probe2.request(10)
               if (appleTimestamp == bananaTimestamp)
                 probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
-              probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana")   => e }
-              probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf")     => e }
+              probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
+              probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
               probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
               probe2.expectNoMessage(waitTime)
             })
@@ -404,19 +405,20 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
     }
 
     "find events from UUID offset " in {
-      withProbe(queries.eventsByTag(tag = "green", offset = NoOffset).runWith(TestSink.probe[Any]), probe1 => {
-        probe1.request(2)
-        probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
-        val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }.offset
-        probe1.cancel()
-
-        val greenSrc2 = queries.eventsByTag(tag = "green", offs)
-        val probe2 = greenSrc2.runWith(TestSink.probe[Any])
-        probe2.request(10)
-        probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf")     => e }
-        probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
-        probe2.expectNoMessage(waitTime)
-      })
+      withProbe(queries.eventsByTag(tag = "green", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe1 => {
+          probe1.request(2)
+          probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
+          val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }.offset
+          probe1.cancel()
+
+          val greenSrc2 = queries.eventsByTag(tag = "green", offs)
+          val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+          probe2.request(10)
+          probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
+          probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
+          probe2.expectNoMessage(waitTime)
+        })
     }
 
     "include timestamp in EventEnvelope" in {
@@ -449,21 +451,22 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val pr2 = PersistentRepr("e2", 2L, "p1", "", writerUuid = w1)
       writeTaggedEvent(t2, pr2, Set("T1-live"), 2, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T1-live", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
-
-        val t3 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
-        val pr3 = PersistentRepr("e3", 3L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t3, pr3, Set("T1-live"), 3, bucketSize)
-        val t4 = LocalDateTime.now(ZoneOffset.UTC)
-        val pr4 = PersistentRepr("e4", 4L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t4, pr4, Set("T1-live"), 4, bucketSize)
-
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "e3") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 4L, "e4") => e }
-      })
+      withProbe(queries.eventsByTag(tag = "T1-live", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
+
+          val t3 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
+          val pr3 = PersistentRepr("e3", 3L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t3, pr3, Set("T1-live"), 3, bucketSize)
+          val t4 = LocalDateTime.now(ZoneOffset.UTC)
+          val pr4 = PersistentRepr("e4", 4L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t4, pr4, Set("T1-live"), 4, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "e3") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 4L, "e4") => e }
+        })
     }
 
     "sort events by timestamp" in {
@@ -476,21 +479,22 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val pr3 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
       writeTaggedEvent(t3, pr3, Set("T2"), 2, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T2", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-
-        // simulate async eventually consistent Materialized View update
-        // that cause p1-e2 to show up before p2-e1
-        Thread.sleep(500)
-        val t2 = t3.minus(1, ChronoUnit.MILLIS)
-        val pr2 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
-        writeTaggedEvent(t2, pr2, Set("T2"), 1, bucketSize)
+      withProbe(queries.eventsByTag(tag = "T2", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
-        val e3 = probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
-        (System.currentTimeMillis() - e3.timestamp) should be < 10000L
-      })
+          // simulate async eventually consistent Materialized View update
+          // that cause p1-e2 to show up before p2-e1
+          Thread.sleep(500)
+          val t2 = t3.minus(1, ChronoUnit.MILLIS)
+          val pr2 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
+          writeTaggedEvent(t2, pr2, Set("T2"), 1, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
+          val e3 = probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
+          (System.currentTimeMillis() - e3.timestamp) should be < 10000L
+        })
     }
 
     "stream many events" in {
@@ -604,21 +608,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val p2e1 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
       writeTaggedEvent(t2, p2e1, Set("T6"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T6", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
-
-        // delayed, and timestamp is before p2-e1
-        val t3 = t1.plusSeconds(1)
-        val p1e2 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t3, p1e2, Set("T6"), 2, bucketSize)
-        val p1e3 = PersistentRepr("p1-e3", 3L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t2.plusSeconds(1), p1e3, Set("T6"), 3, bucketSize)
-
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "p1-e3") => e }
-      })
+      withProbe(queries.eventsByTag(tag = "T6", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
+
+          // delayed, and timestamp is before p2-e1
+          val t3 = t1.plusSeconds(1)
+          val p1e2 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t3, p1e2, Set("T6"), 2, bucketSize)
+          val p1e3 = PersistentRepr("p1-e3", 3L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t2.plusSeconds(1), p1e3, Set("T6"), 3, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "p1-e3") => e }
+        })
     }
 
     "find delayed events 2" in {
@@ -630,21 +635,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t2, eventA1, Set("T7"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T7", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
+      withProbe(queries.eventsByTag(tag = "T7", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
 
-        // delayed, timestamp is before A1
-        val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t1, eventB1, Set("T7"), 1, bucketSize)
-        // second delayed is after A1 so should be found and trigger a search for B1
-        val t3 = t1.plusSeconds(2)
-        val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t3, eventB2, Set("T7"), 2, bucketSize)
+          // delayed, timestamp is before A1
+          val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t1, eventB1, Set("T7"), 1, bucketSize)
+          // second delayed is after A1 so should be found and trigger a search for B1
+          val t3 = t1.plusSeconds(2)
+          val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t3, eventB2, Set("T7"), 2, bucketSize)
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e } // failed in travis
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
-      })
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e } // failed in travis
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+        })
     }
 
     "find delayed events 3" in {
@@ -659,21 +665,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t2, eventA1, Set("T8"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T8", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
-
-        // delayed, timestamp is before A1
-        val eventB1 = PersistentRepr("B1", 2L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t1, eventB1, Set("T8"), 2, bucketSize)
-        val t3 = t1.plusSeconds(2)
-        val eventB2 = PersistentRepr("B2", 3L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t3, eventB2, Set("T8"), 3, bucketSize)
-
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 3L, "B2") => e }
-      })
+      withProbe(queries.eventsByTag(tag = "T8", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
+
+          // delayed, timestamp is before A1
+          val eventB1 = PersistentRepr("B1", 2L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t1, eventB1, Set("T8"), 2, bucketSize)
+          val t3 = t1.plusSeconds(2)
+          val eventB2 = PersistentRepr("B2", 3L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t3, eventB2, Set("T8"), 3, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 3L, "B2") => e }
+        })
     }
 
     "find delayed events from offset" in {
@@ -684,25 +691,27 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t1.plusSeconds(2), eventA1, Set("T9"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T9", offset = NoOffset).runWith(TestSink.probe[Any]), probe1 => {
-        probe1.request(10)
-        val offs =
-          probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }.offset.asInstanceOf[TimeBasedUUID]
+      withProbe(queries.eventsByTag(tag = "T9", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe1 => {
+          probe1.request(10)
+          val offs =
+            probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }.offset.asInstanceOf[TimeBasedUUID]
 
-        withProbe(queries.eventsByTag(tag = "T9", offset = offs).runWith(TestSink.probe[Any]), probe2 => {
-          probe2.request(10)
+          withProbe(queries.eventsByTag(tag = "T9", offset = offs).runWith(TestSink.probe[Any]),
+            probe2 => {
+              probe2.request(10)
 
-          // delayed, timestamp is before A1, i.e. before the offset so should not be picked up
-          val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
-          writeTaggedEvent(t1.plusSeconds(1), eventB1, Set("T9"), 1, bucketSize)
+              // delayed, timestamp is before A1, i.e. before the offset so should not be picked up
+              val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+              writeTaggedEvent(t1.plusSeconds(1), eventB1, Set("T9"), 1, bucketSize)
 
-          // delayed, timestamp is after A1 so should be picked up
-          val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
-          writeTaggedEvent(t1.plusSeconds(3), eventB2, Set("T9"), 2, bucketSize)
+              // delayed, timestamp is after A1 so should be picked up
+              val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+              writeTaggedEvent(t1.plusSeconds(3), eventB2, Set("T9"), 2, bucketSize)
 
-          probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+              probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+            })
         })
-      })
     }
 
     // Not supported atm as it requires us to back track without seeing a future event
@@ -717,29 +726,30 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
         writeTaggedEvent(t1.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
       }
 
-      withProbe(queries.eventsByTag(tag = "T10", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(1000)
-        probe.expectNextN(100)
+      withProbe(queries.eventsByTag(tag = "T10", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(1000)
+          probe.expectNextN(100)
 
-        val t2 = t1.plusSeconds(1)
-        (101L to 200L).foreach { n =>
-          val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1)
-          writeTaggedEvent(t2.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
-        }
+          val t2 = t1.plusSeconds(1)
+          (101L to 200L).foreach { n =>
+            val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1)
+            writeTaggedEvent(t2.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
+          }
 
-        // delayed, timestamp is before A101 but after A100
-        val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t2.minus(100, ChronoUnit.MILLIS), eventB1, Set("T10"), 1, bucketSize)
+          // delayed, timestamp is before A101 but after A100
+          val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t2.minus(100, ChronoUnit.MILLIS), eventB1, Set("T10"), 1, bucketSize)
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e }
 
-        // Now A101 - A200 can be delivered
-        probe.expectNextN(100)
+          // Now A101 - A200 can be delivered
+          probe.expectNextN(100)
 
-        val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t2.plusSeconds(1), eventB2, Set("T10"), 2, bucketSize)
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
-      })
+          val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t2.plusSeconds(1), eventB2, Set("T10"), 2, bucketSize)
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+        })
     }
 
     "find events from many persistenceIds" in {
@@ -815,10 +825,11 @@ class EventsByTagStrictBySeqNoEarlyFirstOffsetSpec
 
       // the search for delayed events should start before we get to the current timebucket
       // until 0.26/0.51 backtracking was broken and events would be skipped
-      withProbe(queries.eventsByTag(tag = "T11", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(2000)
-        probe.expectNextN(2000)
-      })
+      withProbe(queries.eventsByTag(tag = "T11", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(2000)
+          probe.expectNextN(2000)
+        })
     }
   }
 }
@@ -851,27 +862,30 @@ class EventsByTagLongRefreshIntervalSpec
     sender.expectNoMessage(200.millis) // try and give time for the tagged event to be flushed so the query doesn't need to wait for the refresh interval
 
     val offset: Offset =
-      withProbe(queries.eventsByTag(tag = "animal", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(2)
-        probe.expectNextPF {
-          case EventEnvelope(offset, `pid`, 1L, "cat") =>
-            offset
-        }
-      })
+      withProbe(queries.eventsByTag(tag = "animal", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(2)
+          probe.expectNextPF {
+            case EventEnvelope(offset, `pid`, 1L, "cat") =>
+              offset
+          }
+        })
 
     pa.tell(Tagged("cat2", Set("animal")), sender.ref)
     sender.expectMsg("cat2-done")
     // flush interval for tag writes is 0ms but still give some time for the tag write to complete
     sender.expectNoMessage(250.millis)
 
-    withProbe(queries.eventsByTag(tag = "animal", offset = offset).runWith(TestSink.probe[Any]), probe => {
-      probe.request(2)
-      // less than the refresh interval, previously this would evaluate the new persistence-id timeout and then not re-evaluate
-      // it again until the next refresh interval
-      probe.expectNextWithTimeoutPF(2.seconds, {
-        case EventEnvelope(_, `pid`, 2L, "cat2") =>
+    withProbe(queries.eventsByTag(tag = "animal", offset = offset).runWith(TestSink.probe[Any]),
+      probe => {
+        probe.request(2)
+        // less than the refresh interval, previously this would evaluate the new persistence-id timeout and then not re-evaluate
+        // it again until the next refresh interval
+        probe.expectNextWithTimeoutPF(2.seconds,
+          {
+            case EventEnvelope(_, `pid`, 2L, "cat2") =>
+          })
       })
-    })
   }
 }
 
@@ -1011,29 +1025,30 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even
           writeTaggedEvent(t2, eventB, Set("T14"), n - 112, bucketSize)
       }
 
-      withProbe(queries.eventsByTag(tag = "T14", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
+      withProbe(queries.eventsByTag(tag = "T14", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
 
-        val requested1 = 130L
-        probe.request(requested1)
-        val expected1 = 100L + 12 * 2
-        probe.expectNextN(expected1)
-        probe.expectNoMessage(2.seconds)
+          val requested1 = 130L
+          probe.request(requested1)
+          val expected1 = 100L + 12 * 2
+          probe.expectNextN(expected1)
+          probe.expectNoMessage(2.seconds)
 
-        system.log.debug("writing missing event, 113, and a bunch of delayed from C")
-        (1L to 100L).foreach { n =>
-          val eventC = PersistentRepr(s"C$n", n, "c", "", writerUuid = w3)
-          val t = t1.plus(3 * n + 2, ChronoUnit.MILLIS)
-          writeTaggedEvent(t, eventC, Set("T14"), n, bucketSize)
-        }
-        writeTaggedEvent(missingEventTime, missingEvent, Set("T14"), 101, bucketSize)
-        val expected2 = requested1 - expected1
-        probe.expectNextN(expected2)
-        probe.expectNoMessage(200.millis)
-
-        probe.request(1000)
-        probe.expectNextN(8 + 100 - expected2)
-        probe.expectNoMessage(200.millis)
-      })
+          system.log.debug("writing missing event, 113, and a bunch of delayed from C")
+          (1L to 100L).foreach { n =>
+            val eventC = PersistentRepr(s"C$n", n, "c", "", writerUuid = w3)
+            val t = t1.plus(3 * n + 2, ChronoUnit.MILLIS)
+            writeTaggedEvent(t, eventC, Set("T14"), n, bucketSize)
+          }
+          writeTaggedEvent(missingEventTime, missingEvent, Set("T14"), 101, bucketSize)
+          val expected2 = requested1 - expected1
+          probe.expectNextN(expected2)
+          probe.expectNoMessage(200.millis)
+
+          probe.request(1000)
+          probe.expectNextN(8 + 100 - expected2)
+          probe.expectNoMessage(200.millis)
+        })
     }
 
     "find all events" in {
diff --git a/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala b/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
index 487f409..df7c4a2 100644
--- a/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
@@ -52,8 +52,8 @@ class CassandraLauncherSpec
         CassandraLauncher.classpathForResources("logback-test.xml"))
 
       awaitAssert({
-        testCassandra()
-      }, 45.seconds)
+          testCassandra()
+        }, 45.seconds)
 
       CassandraLauncher.stop()
 
diff --git a/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala b/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
index a7c7a72..913950c 100644
--- a/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
+++ b/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
@@ -15,7 +15,7 @@ import akka.persistence.cassandra.reconciler.Reconciliation
 
 class AllPersistenceIdsMigrationCompileOnly {
 
-  //#migrate
+  // #migrate
   // System should have the same Cassandra plugin configuration as your application
   // but be careful to remove seed nodes so this doesn't join the cluster
   val system = ActorSystem()
@@ -32,5 +32,5 @@ class AllPersistenceIdsMigrationCompileOnly {
       system.log.error(e, "All persistenceIds migration failed.")
       system.terminate()
   }
-  //#migrate
+  // #migrate
 }
diff --git a/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala b/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
index af8b63b..8c321bd 100644
--- a/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
+++ b/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
@@ -14,7 +14,7 @@ import akka.Done
 
 class ReconciliationCompileOnly {
 
-  //#reconcile
+  // #reconcile
   // System should have the same Cassandra plugin configuration as your application
   // but be careful to remove seed nodes so this doesn't join the cluster
   val system = ActorSystem()
@@ -32,5 +32,5 @@ class ReconciliationCompileOnly {
     // optional: re-build, if this is ommited then it will be re-build next time the pid is started
     _ <- rec.rebuildTagViewForPersistenceIds(pid)
   } yield Done
-  //#reconcile
+  // #reconcile
 }
diff --git a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
index 1a68ec9..103d25d 100644
--- a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
+++ b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
@@ -12,7 +12,7 @@ object CleanupDocExample {
 
   implicit val system: ActorSystem = ???
 
-  //#cleanup
+  // #cleanup
   val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
   val cleanup = new Cleanup(system)
 
@@ -31,6 +31,6 @@ object CleanupDocExample {
     .mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant.toEpochMilli))
     .run()
 
-  //#cleanup
+  // #cleanup
 
 }
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
index 2ea8d58..a50e3e6 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
@@ -38,8 +38,8 @@ class EventProcessorStream[Event: ClassTag](
           readOffset().map { offset =>
             log.infoN("Starting stream for tag [{}] from offset [{}]", tag, offset)
             processEventsByTag(offset, histogram)
-            // groupedWithin can be used here to improve performance by reducing number of offset writes,
-            // with the trade-off of possibility of more duplicate events when stream is restarted
+              // groupedWithin can be used here to improve performance by reducing number of offset writes,
+              // with the trade-off of possibility of more duplicate events when stream is restarted
               .mapAsync(1)(writeOffset)
           }
         }
@@ -52,21 +52,21 @@ class EventProcessorStream[Event: ClassTag](
     query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope =>
       eventEnvelope.event match {
         case event: Event => {
-          // Times from different nodes, take with a pinch of salt
-          val latency = System.currentTimeMillis() - eventEnvelope.timestamp
-          // when restarting without the offset the latency will be too big
-          if (latency < histogram.getMaxValue) {
-            histogram.recordValue(latency)
-          }
-          log.debugN(
-            "Tag {} Event {} persistenceId {}, sequenceNr {}. Latency {}",
-            tag,
-            event,
-            PersistenceId.ofUniqueId(eventEnvelope.persistenceId),
-            eventEnvelope.sequenceNr,
-            latency)
-          Future.successful(Done)
-        }.map(_ => eventEnvelope.offset)
+            // Times from different nodes, take with a pinch of salt
+            val latency = System.currentTimeMillis() - eventEnvelope.timestamp
+            // when restarting without the offset the latency will be too big
+            if (latency < histogram.getMaxValue) {
+              histogram.recordValue(latency)
+            }
+            log.debugN(
+              "Tag {} Event {} persistenceId {}, sequenceNr {}. Latency {}",
+              tag,
+              event,
+              PersistenceId.ofUniqueId(eventEnvelope.persistenceId),
+              eventEnvelope.sequenceNr,
+              latency)
+            Future.successful(Done)
+          }.map(_ => eventEnvelope.offset)
         case other =>
           Future.failed(new IllegalArgumentException(s"Unexpected event [${other.getClass.getName}]"))
       }
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
index 8809898..183b90f 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
@@ -18,22 +18,22 @@ object Main {
   def main(args: Array[String]): Unit = {
 
     ActorSystem(Behaviors.setup[SelfUp] {
-      ctx =>
-        val readSettings = ReadSide.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
-        val writeSettings = ConfigurablePersistentActor.Settings(readSettings.nrTags)
-        val loadSettings = LoadGenerator.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
+        ctx =>
+          val readSettings = ReadSide.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
+          val writeSettings = ConfigurablePersistentActor.Settings(readSettings.nrTags)
+          val loadSettings = LoadGenerator.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
 
-        AkkaManagement(ctx.system).start()
-        ClusterBootstrap(ctx.system).start()
-        val cluster = Cluster(ctx.system)
-        cluster.subscriptions ! Subscribe(ctx.self, classOf[SelfUp])
+          AkkaManagement(ctx.system).start()
+          ClusterBootstrap(ctx.system).start()
+          val cluster = Cluster(ctx.system)
+          cluster.subscriptions ! Subscribe(ctx.self, classOf[SelfUp])
 
-        val topic = ReadSideTopic.init(ctx)
+          val topic = ReadSideTopic.init(ctx)
 
-        if (cluster.selfMember.hasRole("read")) {
-          val session = CassandraSessionRegistry(ctx.system).sessionFor("akka.persistence.cassandra")
-          val offsetTableStmt =
-            """
+          if (cluster.selfMember.hasRole("read")) {
+            val session = CassandraSessionRegistry(ctx.system).sessionFor("akka.persistence.cassandra")
+            val offsetTableStmt =
+              """
               CREATE TABLE IF NOT EXISTS akka.offsetStore (
                 eventProcessorId text,
                 tag text,
@@ -42,27 +42,27 @@ object Main {
               )
            """
 
-          Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
-        }
+            Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
+          }
 
-        Behaviors.receiveMessage {
-          case SelfUp(state) =>
-            ctx.log.infoN(
-              "Cluster member joined. Initializing persistent actors. Roles {}. Members {}",
-              cluster.selfMember.roles,
-              state.members)
-            val ref = ConfigurablePersistentActor.init(writeSettings, ctx.system)
-            if (cluster.selfMember.hasRole("read")) {
-              ctx.spawnAnonymous(Reporter(topic))
-            }
-            ReadSide(ctx.system, topic, readSettings)
-            if (cluster.selfMember.hasRole("load")) {
-              ctx.log.info("Starting load generation")
-              val load = ctx.spawn(LoadGenerator(loadSettings, ref), "load-generator")
-              load ! Start(10.seconds)
-            }
-            Behaviors.empty
-        }
-    }, "apc-example")
+          Behaviors.receiveMessage {
+            case SelfUp(state) =>
+              ctx.log.infoN(
+                "Cluster member joined. Initializing persistent actors. Roles {}. Members {}",
+                cluster.selfMember.roles,
+                state.members)
+              val ref = ConfigurablePersistentActor.init(writeSettings, ctx.system)
+              if (cluster.selfMember.hasRole("read")) {
+                ctx.spawnAnonymous(Reporter(topic))
+              }
+              ReadSide(ctx.system, topic, readSettings)
+              if (cluster.selfMember.hasRole("load")) {
+                ctx.log.info("Starting load generation")
+                val load = ctx.spawn(LoadGenerator(loadSettings, ref), "load-generator")
+                load ! Start(10.seconds)
+              }
+              Behaviors.empty
+          }
+      }, "apc-example")
   }
 }
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
index 91ca0d8..81f64d4 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
@@ -47,7 +47,7 @@ object ReadSide {
     Behaviors.withTimers { timers =>
       timers.startTimerAtFixedRate(ReportMetrics, 10.second)
       Behaviors.setup { ctx =>
-        val start = (settings.tagsPerProcessor * nr)
+        val start = settings.tagsPerProcessor * nr
         val end = start + (settings.tagsPerProcessor) - 1
         val tags = (start to end).map(i => s"tag-$i")
         ctx.log.info("Processor {} processing tags {}", nr, tags)
@@ -59,13 +59,12 @@ object ReadSide {
         // having more tags will also increase write throughput/latency as it'll write to
         // many partitions
         // downside is running many streams/queries against c*
-        tags.foreach(
-          tag =>
-            new EventProcessorStream[ConfigurablePersistentActor.Event](
-              ctx.system,
-              ctx.executionContext,
-              s"processor-$nr",
-              tag).runQueryStream(killSwitch, histogram))
+        tags.foreach(tag =>
+          new EventProcessorStream[ConfigurablePersistentActor.Event](
+            ctx.system,
+            ctx.executionContext,
+            s"processor-$nr",
+            tag).runQueryStream(killSwitch, histogram))
 
         Behaviors
           .receiveMessage[Command] {
diff --git a/project/Common.scala b/project/Common.scala
index 4566f7d..83e9176 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -22,14 +22,14 @@ object Common extends AutoPlugin {
       homepage := Some(url("https://akka.io")),
       // apiURL defined in projectSettings because version.value is not correct here
       scmInfo := Some(
-          ScmInfo(
-            url("https://github.com/akka/akka-persistence-cassandra"),
-            "git@github.com:akka/akka-persistence-cassandra.git")),
+        ScmInfo(
+          url("https://github.com/akka/akka-persistence-cassandra"),
+          "git@github.com:akka/akka-persistence-cassandra.git")),
       developers += Developer(
-          "contributors",
-          "Contributors",
-          "https://gitter.im/akka/dev",
-          url("https://github.com/akka/akka-persistence-cassandra/graphs/contributors")),
+        "contributors",
+        "Contributors",
+        "https://gitter.im/akka/dev",
+        url("https://github.com/akka/akka-persistence-cassandra/graphs/contributors")),
       licenses := Seq(("Apache-2.0", url("https://www.apache.org/licenses/LICENSE-2.0"))),
       description := "A Cassandra plugin for Akka Persistence.")
 
@@ -41,27 +41,27 @@ object Common extends AutoPlugin {
     scalacOptions ++= Seq("-encoding", "UTF-8", "-feature", "-unchecked", "-Xlint", "-Ywarn-dead-code", "-deprecation"),
     Compile / console / scalacOptions --= Seq("-deprecation", "-Xfatal-warnings", "-Xlint", "-Ywarn-unused:imports"),
     Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
-        "-doc-title",
-        "Akka Persistence Cassandra",
-        "-doc-version",
-        version.value,
-        "-sourcepath",
-        (ThisBuild / baseDirectory).value.toString,
-        "-doc-source-url", {
-          val branch = if (isSnapshot.value) "master" else s"v${version.value}"
-          s"https://github.com/akka/akka-persistence-cassandra/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
-        },
-        "-doc-canonical-base-url",
-        "https://doc.akka.io/api/akka-persistence-cassandra/current/",
-        "-skip-packages",
-        "akka.pattern" // for some reason Scaladoc creates this
-      ),
+      "-doc-title",
+      "Akka Persistence Cassandra",
+      "-doc-version",
+      version.value,
+      "-sourcepath",
+      (ThisBuild / baseDirectory).value.toString,
+      "-doc-source-url", {
+        val branch = if (isSnapshot.value) "master" else s"v${version.value}"
+        s"https://github.com/akka/akka-persistence-cassandra/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
+      },
+      "-doc-canonical-base-url",
+      "https://doc.akka.io/api/akka-persistence-cassandra/current/",
+      "-skip-packages",
+      "akka.pattern" // for some reason Scaladoc creates this
+    ),
     Compile / doc / scalacOptions --= Seq("-Xfatal-warnings"),
     scalafmtOnCompile := true,
     autoAPIMappings := true,
     apiURL := Some(url(s"https://doc.akka.io/api/akka-persistence-cassandra/${projectInfoVersion.value}")),
     headerLicense := Some(
-        HeaderLicense.Custom("""Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>""")),
+      HeaderLicense.Custom("""Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>""")),
     sonatypeProfileName := "com.typesafe",
     Test / logBuffered := System.getProperty("akka.logBufferedTests", "false").toBoolean,
     // show full stack traces and test case durations
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 6e26982..da1e3a4 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -36,15 +36,15 @@ object Dependencies {
     "com.typesafe.akka" %% "akka-cluster-sharding")
 
   val akkaPersistenceCassandraDependencies = Seq(
-      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % AlpakkaVersion,
-      "com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
-      "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
-      "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
-      "org.scala-lang.modules" %% "scala-collection-compat" % "2.4.4",
-      Logback % Test,
-      "org.scalatest" %% "scalatest" % "3.2.11" % Test,
-      "org.pegdown" % "pegdown" % "1.6.0" % Test,
-      "org.osgi" % "org.osgi.core" % "5.0.0" % Provided) ++ akkaTestDeps.map(_ % AkkaVersion % Test)
+    "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % AlpakkaVersion,
+    "com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
+    "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
+    "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
+    "org.scala-lang.modules" %% "scala-collection-compat" % "2.4.4",
+    Logback % Test,
+    "org.scalatest" %% "scalatest" % "3.2.11" % Test,
+    "org.pegdown" % "pegdown" % "1.6.0" % Test,
+    "org.osgi" % "org.osgi.core" % "5.0.0" % Provided) ++ akkaTestDeps.map(_ % AkkaVersion % Test)
 
   val exampleDependencies = Seq(
     Logback,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org