You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/11/15 19:38:16 UTC

[incubator-pekko-persistence-cassandra] branch main updated (508580a -> 7166529)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git


 discard 508580a  unprotect main branch so merge commit can be removed (#6)
 discard c0653cb  Replace travis references with github actions (#5)
 discard ab37717  Remove circleci config (#4)
    omit 393ab5c  try to fix CI build (#3)
    omit 84c1b74  format source with scalafmt, #2
    omit 94537e8  Add .gitattributes to enforce unix line endings
    omit c6d05ba  Update scalafmt
    omit bf86102  Merge pull request #1 from pjfanning/asf-yaml
     new a3b78b8  Update scalafmt
     new 955b84d  Add .gitattributes to enforce unix line endings
     new 9b6b9f1  format source with scalafmt, #2
     new 65de2a6  try to fix CI build (#3)
     new 81308c4  Remove circleci config (#4)
     new 7d943ea  Replace travis references with github actions (#5)
     new 7166529  unprotect main branch so merge commit can be removed (#6)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (508580a)
            \
             N -- N -- N   refs/heads/main (7166529)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-persistence-cassandra] 01/07: Update scalafmt

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit a3b78b8b5855e38d53dc682a5e99bbd3e36c36fb
Author: Matthew de Detrich <ma...@aiven.io>
AuthorDate: Sat Nov 12 07:38:56 2022 +0100

    Update scalafmt
---
 .scalafmt.conf      | 58 ++++++++++++++++++++++++++++++++++++-----------------
 project/plugins.sbt |  2 +-
 2 files changed, 41 insertions(+), 19 deletions(-)

diff --git a/.scalafmt.conf b/.scalafmt.conf
index f1c9cfd..2ca4ece 100644
--- a/.scalafmt.conf
+++ b/.scalafmt.conf
@@ -1,20 +1,40 @@
-version = 2.1.0
-
-style = defaultWithAlign
-
-docstrings                 = JavaDoc
-indentOperator             = spray
-maxColumn                  = 120
-rewrite.rules              = [RedundantParens, SortImports, AvoidInfix]
-unindentTopLevelOperators  = true
-align.tokens               = [{code = "=>", owner = "Case"}]
-align.openParenDefnSite    = false
-align.openParenCallSite    = false
-optIn.breakChainOnFirstMethodDot = false
-optIn.configStyleArguments = false
-danglingParentheses = false
-spaces.inImportCurlyBraces = true
-rewrite.neverInfix.excludeFilters = [
+version                                  = 3.6.1
+runner.dialect                           = scala213
+project.git                              = true
+style                                    = defaultWithAlign
+docstrings.style                         = Asterisk
+docstrings.wrap                          = false
+indentOperator.preset                    = spray
+maxColumn                                = 120
+lineEndings                              = preserve
+rewrite.rules                            = [RedundantParens, SortImports, AvoidInfix]
+indentOperator.exemptScope               = all
+align.preset                             = some
+align.tokens."+"                         = [
+  {
+    code   = "~>"
+    owners = [
+      { regex = "Term.ApplyInfix" }
+    ]
+  }
+]
+literals.hexDigits                       = upper
+literals.hexPrefix                       = lower
+binPack.unsafeCallSite                   = always
+binPack.unsafeDefnSite                   = always
+binPack.indentCallSiteSingleArg          = false
+binPack.indentCallSiteOnce               = true
+newlines.avoidForSimpleOverflow          = [slc]
+newlines.source                          = keep
+newlines.beforeMultiline                 = keep
+align.openParenDefnSite                  = false
+align.openParenCallSite                  = false
+align.allowOverflow                      = true
+optIn.breakChainOnFirstMethodDot         = false
+optIn.configStyleArguments               = false
+danglingParentheses.preset               = false
+spaces.inImportCurlyBraces               = true
+rewrite.neverInfix.excludeFilters        = [
   and
   min
   max
@@ -44,9 +64,11 @@ rewrite.neverInfix.excludeFilters = [
   allElementsOf
   inOrderElementsOf
   theSameElementsAs
+  theSameElementsInOrderAs
 ]
-rewriteTokens = {
+rewriteTokens          = {
   "⇒": "=>"
   "→": "->"
   "←": "<-"
 }
+project.layout         = StandardConvention
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 2d34441..7af20b4 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -1,5 +1,5 @@
 addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.5")
-addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.1")
+addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
 addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
 addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-persistence-cassandra] 03/07: format source with scalafmt, #2

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit 9b6b9f1850463f8061545cccc869b848154344dd
Author: Auto Format <nobody>
AuthorDate: Sat Nov 12 07:40:57 2022 +0100

    format source with scalafmt, #2
---
 build.sbt                                          |  76 ++---
 .../cassandra/testkit/CassandraLauncher.scala      |  23 +-
 .../cassandra/EventsByTagMigration.scala           |   1 -
 .../cassandra/KeyspaceAndTableStatements.scala     |   1 -
 .../persistence/cassandra/cleanup/Cleanup.scala    |   1 -
 .../compaction/BaseCompactionStrategy.scala        |   3 +-
 .../compaction/LeveledCompactionStrategy.scala     |   3 +-
 .../compaction/SizeTieredCompactionStrategy.scala  |   2 +-
 .../persistence/cassandra/journal/Buffer.scala     |   4 +-
 .../cassandra/journal/CassandraJournal.scala       |  17 +-
 .../journal/CassandraJournalStatements.scala       |   4 +-
 .../cassandra/journal/CassandraTagRecovery.scala   |  17 +-
 .../cassandra/journal/PubSubThrottler.scala        |   2 +-
 .../persistence/cassandra/journal/TagWriter.scala  |   2 +-
 .../persistence/cassandra/journal/TagWriters.scala |   7 +-
 .../query/EventsByPersistenceIdStage.scala         |   1 -
 .../cassandra/query/EventsByTagStage.scala         |  25 +-
 .../query/TagViewSequenceNumberScanner.scala       |   4 +-
 .../query/javadsl/CassandraReadJournal.scala       |   2 -
 .../akka/persistence/cassandra/query/package.scala |   8 +-
 .../query/scaladsl/CassandraReadJournal.scala      |   5 +-
 .../snapshot/CassandraSnapshotStore.scala          |  12 +-
 .../cassandra/MultiNodeClusterSpec.scala           |  87 +++--
 .../cassandra/CassandraEventsByTagLoadSpec.scala   |  21 +-
 .../akka/persistence/cassandra/CassandraSpec.scala |   6 +-
 .../cassandra/EventsByTagMigrationSpec.scala       |  32 +-
 .../akka/persistence/cassandra/RetriesSpec.scala   |   6 +-
 .../cassandra/cleanup/CleanupSpec.scala            |  22 +-
 .../CassandraCompactionStrategySpec.scala          |   8 +-
 .../cassandra/query/EventAdaptersReadSpec.scala    |  45 +--
 .../cassandra/query/EventsByTagSpec.scala          | 365 +++++++++++----------
 .../cassandra/testkit/CassandraLauncherSpec.scala  |   4 +-
 .../AllPersistenceIdsMigrationCompileOnly.scala    |   4 +-
 .../doc/reconciler/ReconciliationCompileOnly.scala |   4 +-
 .../test/scala/doc/cleanup/CleanupDocExample.scala |   4 +-
 .../cassandra/example/EventProcessorStream.scala   |  34 +-
 .../akka/persistence/cassandra/example/Main.scala  |  68 ++--
 .../persistence/cassandra/example/ReadSide.scala   |  15 +-
 project/Common.scala                               |  46 +--
 project/Dependencies.scala                         |  18 +-
 40 files changed, 513 insertions(+), 496 deletions(-)

diff --git a/build.sbt b/build.sbt
index 1e39948..512dd39 100644
--- a/build.sbt
+++ b/build.sbt
@@ -27,7 +27,7 @@ lazy val core = project
     name := "akka-persistence-cassandra",
     libraryDependencies ++= Dependencies.akkaPersistenceCassandraDependencies,
     Compile / packageBin / packageOptions += Package.ManifestAttributes(
-        "Automatic-Module-Name" -> "akka.persistence.cassandra"))
+      "Automatic-Module-Name" -> "akka.persistence.cassandra"))
   .configs(MultiJvm)
 
 lazy val cassandraLauncher = project
@@ -48,7 +48,7 @@ lazy val cassandraBundle = project
     crossPaths := false,
     autoScalaLibrary := false,
     libraryDependencies += ("org.apache.cassandra" % "cassandra-all" % "3.11.3")
-        .exclude("commons-logging", "commons-logging"),
+      .exclude("commons-logging", "commons-logging"),
     dependencyOverrides += "com.github.jbellis" % "jamm" % "0.3.3", // See jamm comment in https://issues.apache.org/jira/browse/CASSANDRA-9608
     assembly / target := target.value / "bundle" / "akka" / "persistence" / "cassandra" / "launcher",
     assembly / assemblyJarName := "cassandra-bundle.jar")
@@ -69,19 +69,19 @@ lazy val endToEndExample = project
     dockerUsername := Some("kubakka"),
     dockerUpdateLatest := true,
     // update if deploying to some where that can't see docker hu
-    //dockerRepository := Some("some-registry"),
+    // dockerRepository := Some("some-registry"),
     dockerCommands ++= Seq(
-        Cmd("USER", "root"),
-        Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", "busybox-extras", "curl", "iptables"),
-        Cmd(
-          "RUN",
-          "/sbin/apk",
-          "add",
-          "--no-cache",
-          "jattach",
-          "--repository",
-          "http://dl-cdn.alpinelinux.org/alpine/edge/community/"),
-        Cmd("RUN", "chgrp -R 0 . && chmod -R g=u .")),
+      Cmd("USER", "root"),
+      Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", "busybox-extras", "curl", "iptables"),
+      Cmd(
+        "RUN",
+        "/sbin/apk",
+        "add",
+        "--no-cache",
+        "jattach",
+        "--repository",
+        "http://dl-cdn.alpinelinux.org/alpine/edge/community/"),
+      Cmd("RUN", "chgrp -R 0 . && chmod -R g=u .")),
     // Docker image is only for running in k8s
     Universal / javaOptions ++= Seq("-J-Dconfig.resource=kubernetes.conf"))
   .enablePlugins(DockerPlugin, JavaAppPackaging)
@@ -104,30 +104,30 @@ lazy val docs = project
     Preprocess / sourceDirectory := (LocalRootProject / ScalaUnidoc / unidoc / target).value,
     Paradox / siteSubdirName := s"docs/akka-persistence-cassandra/${projectInfoVersion.value}",
     Compile / paradoxProperties ++= Map(
-        "project.url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current/",
-        "canonical.base_url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current",
-        "akka.version" -> Dependencies.AkkaVersion,
-        // Akka
-        "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
-        "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
-        "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
-        // Alpakka
-        "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
-        "scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
-        "javadoc.akka.stream.alpakka.base_url" -> "",
-        // APC 0.x
-        "extref.apc-0.x.base_url" -> s"https://doc.akka.io/docs/akka-persistence-cassandra/0.103/%s",
-        // Cassandra
-        "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s",
-        // Datastax Java driver
-        "extref.java-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.DriverVersionInDocs}/%s",
-        "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.DriverVersionInDocs}/",
-        // Java
-        "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
-        // Scala
-        "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
-        "scaladoc.akka.persistence.cassandra.base_url" -> s"/${(Preprocess / siteSubdirName).value}/",
-        "javadoc.akka.persistence.cassandra.base_url" -> ""), // no Javadoc is published
+      "project.url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current/",
+      "canonical.base_url" -> "https://doc.akka.io/docs/akka-persistence-cassandra/current",
+      "akka.version" -> Dependencies.AkkaVersion,
+      // Akka
+      "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s",
+      "scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
+      "javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
+      // Alpakka
+      "extref.alpakka.base_url" -> s"https://doc.akka.io/docs/alpakka/${Dependencies.AlpakkaVersionInDocs}/%s",
+      "scaladoc.akka.stream.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${Dependencies.AlpakkaVersionInDocs}/",
+      "javadoc.akka.stream.alpakka.base_url" -> "",
+      // APC 0.x
+      "extref.apc-0.x.base_url" -> s"https://doc.akka.io/docs/akka-persistence-cassandra/0.103/%s",
+      // Cassandra
+      "extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s",
+      // Datastax Java driver
+      "extref.java-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.DriverVersionInDocs}/%s",
+      "javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.DriverVersionInDocs}/",
+      // Java
+      "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/",
+      // Scala
+      "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/${scalaBinaryVersion.value}.x/",
+      "scaladoc.akka.persistence.cassandra.base_url" -> s"/${(Preprocess / siteSubdirName).value}/",
+      "javadoc.akka.persistence.cassandra.base_url" -> ""), // no Javadoc is published
     paradoxGroups := Map("Language" -> Seq("Java", "Scala")),
     ApidocPlugin.autoImport.apidocRootPackage := "akka",
     resolvers += Resolver.jcenterRepo,
diff --git a/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala b/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
index 5e4fc35..ff82c8c 100644
--- a/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
+++ b/cassandra-launcher/src/main/scala/akka/persistence/cassandra/testkit/CassandraLauncher.scala
@@ -260,7 +260,7 @@ object CassandraLauncher {
 
       println(
         s"Starting Cassandra on port client port: $realPort storage port $storagePort host $realHost java version ${System
-          .getProperty("java.runtime.version")}")
+            .getProperty("java.runtime.version")}")
 
       // http://wiki.apache.org/cassandra/StorageConfiguration
       val conf = readResource(configResource)
@@ -400,16 +400,17 @@ object CassandraLauncher {
     val deadline = AwaitListenTimeout.fromNow
     @annotation.tailrec
     def tryConnect(): Unit = {
-      val retry = try {
-        new Socket(host, port).close()
-        false
-      } catch {
-        case _: IOException if deadline.hasTimeLeft() =>
-          Thread.sleep(AwaitListenPoll.toMillis)
-          true
-        case ioe: IOException =>
-          throw new RuntimeException(s"Cassandra did not start within $AwaitListenTimeout", ioe)
-      }
+      val retry =
+        try {
+          new Socket(host, port).close()
+          false
+        } catch {
+          case _: IOException if deadline.hasTimeLeft() =>
+            Thread.sleep(AwaitListenPoll.toMillis)
+            true
+          case ioe: IOException =>
+            throw new RuntimeException(s"Cassandra did not start within $AwaitListenTimeout", ioe)
+        }
       if (retry) tryConnect()
     }
     tryConnect()
diff --git a/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala b/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
index b063d05..390bc3a 100644
--- a/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/EventsByTagMigration.scala
@@ -68,7 +68,6 @@ object EventsByTagMigration {
 }
 
 /**
- *
  * @param pluginConfigPath The config namespace where the plugin is configured, default is `akka.persistence.cassandra`
  */
 class EventsByTagMigration(
diff --git a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
index e1394d9..9586da3 100644
--- a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala
@@ -31,7 +31,6 @@ class KeyspaceAndTableStatements(
    *
    * This can be queried in for example a startup script without accessing the actual
    * Cassandra plugin actor.
-   *
    */
   def createJournalTablesStatements: immutable.Seq[String] =
     journalStatements.createTable ::
diff --git a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
index aeb2618..b61cb09 100644
--- a/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/cleanup/Cleanup.scala
@@ -131,7 +131,6 @@ final class Cleanup(systemProvider: ClassicActorSystemProvider, settings: Cleanu
    * snapshot.
    *
    * @return the snapshot meta of the oldest remaining snapshot. None if there are no snapshots
-   *
    */
   def deleteBeforeSnapshot(
       persistenceId: String,
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
index 71841bb..7f2239a 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala
@@ -16,7 +16,8 @@ abstract class BaseCompactionStrategy(config: Config, className: String, propert
   require(config.hasPath("class") && config.getString("class") == className, s"Config does not specify a $className")
   require(
     config.entrySet().asScala.map(_.getKey).forall(propertyKeys.contains(_)),
-    s"Config contains properties not supported by a $className. Supported: $propertyKeys. Supplied: ${config.entrySet().asScala.map(_.getKey)}")
+    s"Config contains properties not supported by a $className. Supported: $propertyKeys. Supplied: ${config.entrySet().asScala.map(
+        _.getKey)}")
 
   val enabled: Boolean =
     if (config.hasPath("enabled")) config.getBoolean("enabled") else true
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
index a4be050..db7c05f 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/LeveledCompactionStrategy.scala
@@ -10,7 +10,8 @@ import com.typesafe.config.Config
  * https://github.com/apache/cassandra/blob/cassandra-2.2/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
  */
 class LeveledCompactionStrategy(config: Config)
-    extends BaseCompactionStrategy(config, LeveledCompactionStrategy.ClassName, LeveledCompactionStrategy.propertyKeys) {
+    extends BaseCompactionStrategy(config, LeveledCompactionStrategy.ClassName,
+      LeveledCompactionStrategy.propertyKeys) {
   val ssTableSizeInMB: Long =
     if (config.hasPath("sstable_size_in_mb"))
       config.getLong("sstable_size_in_mb")
diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
index fb02ff1..d6108de 100644
--- a/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/compaction/SizeTieredCompactionStrategy.scala
@@ -51,7 +51,7 @@ object SizeTieredCompactionStrategy extends CassandraCompactionStrategyConfig[Si
 
   override def propertyKeys: List[String] =
     (BaseCompactionStrategy.propertyKeys ++
-    List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted
+      List("bucket_high", "bucket_low", "max_threshold", "min_threshold", "min_sstable_size")).sorted
 
   override def fromConfig(config: Config): SizeTieredCompactionStrategy =
     new SizeTieredCompactionStrategy(config)
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
index 0e8d2c8..2c2aa2b 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/Buffer.scala
@@ -65,8 +65,8 @@ private[akka] case class Buffer(
         // add them to pending, any time bucket changes will be detected later
         copy(size = newSize, pending = pending :+ write)
       } else if (nextBatch.headOption.exists(oldestEvent =>
-                   UUIDComparator.comparator
-                     .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
+          UUIDComparator.comparator
+            .compare(write.events.head._1.timeUuid, oldestEvent.events.head._1.timeUuid) < 0)) {
         // rare case where events have been received out of order, just re-build the buffer
         require(pending.isEmpty)
         val allWrites = (nextBatch :+ write).sortBy(_.events.head._1.timeUuid)(timeUuidOrdering)
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
index 9f6de3b..cb601de 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala
@@ -6,7 +6,7 @@ package akka.persistence.cassandra.journal
 
 import java.lang.{ Long => JLong }
 import java.nio.ByteBuffer
-import java.util.{ UUID, HashMap => JHMap, Map => JMap }
+import java.util.{ HashMap => JHMap, Map => JMap, UUID }
 
 import akka.Done
 import akka.actor.SupervisorStrategy.Stop
@@ -266,7 +266,7 @@ import akka.stream.scaladsl.Source
             writeMessages(serialized)
           } else {
 
-            //if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
+            // if presistAll was used, single AtomicWrite can already contain complete batch, so we need to regroup writes correctly
             val groups: List[List[SerializedAtomicWrite]] = groupedWrites(serialized.toList.reverse, Nil, Nil)
 
             // execute the groups in sequence
@@ -301,7 +301,7 @@ import akka.stream.scaladsl.Source
     toReturn
   }
 
-  //Regroup batches by payload size
+  // Regroup batches by payload size
   @tailrec
   private def groupedWrites(
       reversed: List[SerializedAtomicWrite],
@@ -639,12 +639,11 @@ import akka.stream.scaladsl.Source
     val boundSelectHighestSequenceNr = preparedSelectHighestSequenceNr.map(_.bind(persistenceId, partitionNr: JLong))
     boundSelectHighestSequenceNr
       .flatMap(selectOne)
-      .map(
-        row =>
-          row
-            .map(s =>
-              PartitionInfo(partitionNr, minSequenceNr(partitionNr), math.min(s.getLong("sequence_nr"), maxSequenceNr)))
-            .getOrElse(PartitionInfo(partitionNr, minSequenceNr(partitionNr), -1)))
+      .map(row =>
+        row
+          .map(s =>
+            PartitionInfo(partitionNr, minSequenceNr(partitionNr), math.min(s.getLong("sequence_nr"), maxSequenceNr)))
+          .getOrElse(PartitionInfo(partitionNr, minSequenceNr(partitionNr), -1)))
   }
 
   private def asyncHighestDeletedSequenceNumber(persistenceId: String): Future[Long] = {
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
index dc693c9..06e3604 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala
@@ -76,8 +76,8 @@ import akka.persistence.cassandra.FutureDone
       |  WITH gc_grace_seconds =${eventsByTagSettings.tagTable.gcGraceSeconds}
       |  AND compaction = ${indent(eventsByTagSettings.tagTable.compactionStrategy.asCQL, "    ")}
       |  ${if (eventsByTagSettings.tagTable.ttl.isDefined)
-         "AND default_time_to_live = " + eventsByTagSettings.tagTable.ttl.get.toSeconds
-       else ""}
+        "AND default_time_to_live = " + eventsByTagSettings.tagTable.ttl.get.toSeconds
+      else ""}
     """.stripMargin.trim
 
   def createTagsProgressTable: String =
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
index a444f64..6ea4c40 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraTagRecovery.scala
@@ -85,15 +85,14 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
     else {
       val completed: List[Future[Done]] =
         tpr.tags.toList
-          .map(
-            tag =>
-              tag -> serializeEvent(
-                tpr.pr,
-                tpr.tags,
-                tpr.offset,
-                settings.eventsByTagSettings.bucketSize,
-                serialization,
-                system))
+          .map(tag =>
+            tag -> serializeEvent(
+              tpr.pr,
+              tpr.tags,
+              tpr.offset,
+              settings.eventsByTagSettings.bucketSize,
+              serialization,
+              system))
           .map {
             case (tag, serializedFut) =>
               serializedFut.map { serialized =>
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala b/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
index 0cc9a39..a947ade 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/PubSubThrottler.scala
@@ -27,7 +27,7 @@ import akka.annotation.InternalApi
   def receive = {
     case Tick =>
       for ((msg, clients) <- repeated;
-           client <- clients) {
+        client <- clients) {
         delegate.tell(msg, client)
       }
       seen.clear()
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
index c74630a..93b81f2 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriter.scala
@@ -344,7 +344,7 @@ import scala.util.{ Failure, Success, Try }
               throw new IllegalStateException(
                 s"Expected events to be ordered by seqNr. ${event.persistenceId} " +
                 s"Events: ${writes.nextBatch.map(e =>
-                  (e.events.head._1.persistenceId, e.events.head._1.sequenceNr, e.events.head._1.timeUuid))}")
+                    (e.events.head._1.persistenceId, e.events.head._1.sequenceNr, e.events.head._1.timeUuid))}")
 
             acc + (event.persistenceId -> PidProgress(from, event.sequenceNr, tagPidSequenceNr, event.timeUuid))
           case None =>
diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
index f5b1c14..4374cd9 100644
--- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala
@@ -106,10 +106,9 @@ import scala.util.Try
     def writeProgress(tag: Tag, persistenceId: String, seqNr: Long, tagPidSequenceNr: Long, offset: UUID)(
         implicit ec: ExecutionContext): Future[Done] = {
       WriteTagProgress
-        .map(
-          ps =>
-            ps.bind(persistenceId, tag, seqNr: JLong, tagPidSequenceNr: JLong, offset)
-              .setExecutionProfileName(writeProfile))
+        .map(ps =>
+          ps.bind(persistenceId, tag, seqNr: JLong, tagPidSequenceNr: JLong, offset)
+            .setExecutionProfileName(writeProfile))
         .flatMap(executeWrite)
     }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index edcfe8e..f8d1471 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -452,7 +452,6 @@ import akka.persistence.cassandra.PluginSettings
             }
 
           case QueryIdle | _: QueryInProgress | _: QueryResult => // ok
-
         }
       }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
index ddc3882..ecbc797 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala
@@ -99,10 +99,10 @@ import scala.compat.java8.FutureConverters._
         implicit ec: ExecutionContext,
         scheduler: Scheduler): Future[AsyncResultSet] = {
       Retries.retry({ () =>
-        val bound =
-          statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
-        session.executeAsync(bound).toScala
-      }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
+          val bound =
+            statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
+          session.executeAsync(bound).toScala
+        }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
     }
   }
 
@@ -121,7 +121,6 @@ import scala.compat.java8.FutureConverters._
   final case class MissingData(maxOffset: UUID, maxSequenceNr: TagPidSequenceNr)
 
   /**
-   *
    * @param queryPrevious Should the previous bucket be queries right away. Searches go back one bucket, first the current bucket then
    *                      the previous bucket. This is repeated every refresh interval.
    * @param gapDetected Whether an explicit gap has been detected e.g. events 1-4 have been seen then the next event is not 5.
@@ -190,7 +189,8 @@ import scala.compat.java8.FutureConverters._
 
     // override to give nice offset formatting
     override def toString: String =
-      s"""StageState(state: $state, fromOffset: ${formatOffset(fromOffset)}, toOffset: ${formatOffset(toOffset)}, tagPidSequenceNrs: $tagPidSequenceNrs, missingLookup: $missingLookup, bucketSize: $bucketSize)"""
+      s"""StageState(state: $state, fromOffset: ${formatOffset(fromOffset)}, toOffset: ${formatOffset(
+          toOffset)}, tagPidSequenceNrs: $tagPidSequenceNrs, missingLookup: $missingLookup, bucketSize: $bucketSize)"""
   }
 
   private val uuidRowOrdering = new Ordering[UUIDRow] {
@@ -377,9 +377,10 @@ import scala.compat.java8.FutureConverters._
       }
 
       override def preStart(): Unit = {
-        stageState = StageState(QueryIdle, initialQueryOffset, calculateToOffset(), initialTagPidSequenceNrs.transform {
-          case (_, (tagPidSequenceNr, offset)) => (tagPidSequenceNr, offset, System.currentTimeMillis())
-        }, delayedScanInProgress = false, System.currentTimeMillis(), None, bucketSize)
+        stageState = StageState(QueryIdle, initialQueryOffset, calculateToOffset(),
+          initialTagPidSequenceNrs.transform {
+            case (_, (tagPidSequenceNr, offset)) => (tagPidSequenceNr, offset, System.currentTimeMillis())
+          }, delayedScanInProgress = false, System.currentTimeMillis(), None, bucketSize)
         if (log.isInfoEnabled) {
           log.info(
             s"[{}]: EventsByTag query [${session.tag}] starting with EC delay {}ms: fromOffset [{}] toOffset [{}]",
@@ -525,7 +526,8 @@ import scala.compat.java8.FutureConverters._
           .selectEventsForBucket(
             stageState.currentTimeBucket,
             stageState.fromOffset,
-            stageState.toOffset, { (attempt, t, nextRetry) =>
+            stageState.toOffset,
+            { (attempt, t, nextRetry) =>
               if (log.isWarningEnabled) {
                 log.warning(
                   s"[{}] Query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ${attempt}. Next retry in: ${nextRetry.pretty}. Reason: ${t.getMessage}",
@@ -566,7 +568,8 @@ import scala.compat.java8.FutureConverters._
             .selectEventsForBucket(
               missing.bucket,
               missing.minOffset,
-              missing.maxOffset, { (attempt, t, nextRetry) =>
+              missing.maxOffset,
+              { (attempt, t, nextRetry) =>
                 if (log.isWarningEnabled) {
                   log.warning(
                     s"[{}] Looking for missing query failed. timeBucket: {} from offset: {} to offset: {}. Attempt ${attempt}. Next retry in: ${nextRetry.pretty}. Reason: ${t.getMessage}",
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
index 8102598..60b34f1 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala
@@ -67,8 +67,8 @@ import akka.stream.scaladsl.Sink
       toOffset: UUID,
       bucketSize: BucketSize,
       scanningPeriod: FiniteDuration,
-      whichToKeep: (TagPidSequenceNr, TagPidSequenceNr) => TagPidSequenceNr)
-      : Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
+      whichToKeep: (TagPidSequenceNr,
+          TagPidSequenceNr) => TagPidSequenceNr): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
 
     def doIt(): Future[Map[PersistenceId, (TagPidSequenceNr, UUID)]] = {
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
index 8b90198..216f47e 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala
@@ -44,7 +44,6 @@ object CassandraReadJournal {
  * Configuration settings can be defined in the configuration section with the
  * absolute path corresponding to the identifier, which is `"akka.persistence.cassandra.query"`
  * for the default [[CassandraReadJournal#Identifier]]. See `reference.conf`.
- *
  */
 class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query.scaladsl.CassandraReadJournal)
     extends ReadJournal
@@ -154,7 +153,6 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
    *
    * Use `NoOffset` when you want all events from the beginning of time.
    * To acquire an offset from a long unix timestamp to use with this query, you can use [[timeBasedUUIDFrom]].
-   *
    */
   override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
     scaladslReadJournal.currentEventsByTag(tag, offset).asJava
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/package.scala b/core/src/main/scala/akka/persistence/cassandra/query/package.scala
index e143ffb..0c6ec42 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/package.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/package.scala
@@ -35,10 +35,10 @@ package object query {
       val timestamp = (time - uuidEpoch) * 10000
 
       var msb = 0L
-      msb |= (0X00000000FFFFFFFFL & timestamp) << 32
-      msb |= (0X0000FFFF00000000L & timestamp) >>> 16
-      msb |= (0X0FFF000000000000L & timestamp) >>> 48
-      msb |= 0X0000000000001000L // sets the version to 1.
+      msb |= (0x00000000FFFFFFFFL & timestamp) << 32
+      msb |= (0x0000FFFF00000000L & timestamp) >>> 16
+      msb |= (0x0FFF000000000000L & timestamp) >>> 48
+      msb |= 0x0000000000001000L // sets the version to 1.
       msb
     }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
index 7045cb8..e4ed50d 100644
--- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
@@ -479,7 +479,6 @@ class CassandraReadJournal protected (
    *
    * Use `NoOffset` when you want all events from the beginning of time.
    * To acquire an offset from a long unix timestamp to use with this query, you can use [[timeBasedUUIDFrom]].
-   *
    */
   override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
     currentEventsByTagInternal(tag, offset)
@@ -654,8 +653,8 @@ class CassandraReadJournal protected (
             fastForwardEnabled))
         .named(name)
     }.mapAsync(querySettings.deserializationParallelism) { row =>
-        extractor.extract(row, deserializeEventAsync)
-      }
+      extractor.extract(row, deserializeEventAsync)
+    }
       .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
   }
 
diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index bf61c26..46b71aa 100644
--- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -193,20 +193,22 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi
       }
     }
 
-  /** Plugin API: deletes all snapshots matching `criteria`. This call is protected with a circuit-breaker.
+  /**
+   * Plugin API: deletes all snapshots matching `criteria`. This call is protected with a circuit-breaker.
    *
    * @param persistenceId id of the persistent actor.
    * @param criteria selection criteria for deleting. If no timestamp constraints are specified this routine
    * @note Due to the limitations of Cassandra deletion requests, this routine makes an initial query in order to obtain the
    * records matching the criteria which are then deleted in a batch deletion. Improvements in Cassandra v3.0+ mean a single
    * range deletion on the sequence number is used instead, except if timestamp constraints are specified, which still
-   * requires the original two step routine.*/
+   * requires the original two step routine.
+   */
   override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
     session.serverMetaData.flatMap { meta =>
       if (meta.isVersion2
-          || settings.cosmosDb
-          || 0L < criteria.minTimestamp
-          || criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
+        || settings.cosmosDb
+        || 0L < criteria.minTimestamp
+        || criteria.maxTimestamp < SnapshotSelectionCriteria.latest().maxTimestamp) {
         preparedSelectSnapshotMetadata.flatMap { snapshotMetaPs =>
           // this meta query gets slower than slower if snapshots are deleted without a criteria.minSequenceNr as
           // all previous tombstones are scanned in the meta data query
diff --git a/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala b/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
index 6a162fd..ee800c6 100644
--- a/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
+++ b/core/src/multi-jvm/scala/akka/cluster/persistence/cassandra/MultiNodeClusterSpec.scala
@@ -3,16 +3,16 @@ package akka.cluster.persistence.cassandra
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 
-import akka.actor.{Actor, ActorRef, ActorSystem, Address, RootActorPath}
-import akka.cluster.{Cluster, ClusterReadView, MemberStatus, _}
+import akka.actor.{ Actor, ActorRef, ActorSystem, Address, RootActorPath }
+import akka.cluster.{ Cluster, ClusterReadView, MemberStatus, _ }
 import akka.event.Logging.ErrorLevel
 import akka.remote.testconductor.RoleName
-import akka.remote.testkit.{FlightRecordingSupport, MultiNodeSpec}
+import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec }
 import akka.testkit.TestEvent._
 import akka.testkit._
-import com.typesafe.config.{Config, ConfigFactory}
+import com.typesafe.config.{ Config, ConfigFactory }
 import org.scalatest.exceptions.TestCanceledException
-import org.scalatest.{Canceled, Outcome, Suite}
+import org.scalatest.{ Canceled, Outcome, Suite }
 
 import scala.collection.immutable
 import scala.concurrent.duration._
@@ -21,8 +21,9 @@ import scala.language.implicitConversions
 object MultiNodeClusterSpec {
 
   def clusterConfigWithFailureDetectorPuppet: Config =
-    ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").
-      withFallback(clusterConfig)
+    ConfigFactory.parseString(
+      "akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").withFallback(
+      clusterConfig)
 
   def clusterConfig(failureDetectorPuppet: Boolean): Config =
     if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
@@ -72,20 +73,20 @@ object MultiNodeClusterSpec {
   class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
     import EndActor._
     def receive: Receive = {
-      case SendEnd ⇒
-        target foreach { t ⇒
+      case SendEnd =>
+        target.foreach { t =>
           context.actorSelection(RootActorPath(t) / self.path.elements) ! End
         }
-      case End ⇒
-        testActor forward End
+      case End =>
+        testActor.forward(End)
         sender() ! EndAck
-      case EndAck ⇒
-        testActor forward EndAck
+      case EndAck =>
+        testActor.forward(EndAck)
     }
   }
 }
 
-trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordingSupport { self: MultiNodeSpec ⇒
+trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordingSupport { self: MultiNodeSpec =>
 
   override def initialParticipants = roles.size
 
@@ -111,9 +112,9 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
         ".*Cluster Node.* - is starting up.*",
         ".*Shutting down cluster Node.*",
         ".*Cluster node successfully shut down.*",
-        ".*Using a dedicated scheduler for cluster.*") foreach { s ⇒
-          sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
-        }
+        ".*Using a dedicated scheduler for cluster.*").foreach { s =>
+        sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
+      }
 
       muteDeadLetters(
         classOf[akka.actor.PoisonPill],
@@ -154,11 +155,11 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   implicit def address(role: RoleName): Address = {
     cachedAddresses.get(role) match {
-      case null ⇒
+      case null =>
         val address = node(role).address
         cachedAddresses.put(role, address)
         address
-      case address ⇒ address
+      case address => address
     }
   }
 
@@ -188,7 +189,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   def startClusterNode(): Unit = {
     if (clusterView.members.isEmpty) {
-      cluster join myself
+      cluster.join(myself)
       awaitAssert(clusterView.members.map(_.address) should contain(address(myself)))
     } else
       clusterView.self
@@ -221,19 +222,20 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   def joinWithin(joinNode: RoleName, max: Duration = remainingOrDefault, interval: Duration = 1.second): Unit = {
     def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
-      clusterView.members.exists { m ⇒ (m.address == member) && status.contains(m.status) }
+      clusterView.members.exists { m => (m.address == member) && status.contains(m.status) }
 
     cluster.join(joinNode)
-    awaitCond({
-      clusterView.refreshCurrentState()
-      if (memberInState(joinNode, List(MemberStatus.up)) &&
-        memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
-        true
-      else {
-        cluster.join(joinNode)
-        false
-      }
-    }, max, interval)
+    awaitCond(
+      {
+        clusterView.refreshCurrentState()
+        if (memberInState(joinNode, List(MemberStatus.up)) &&
+          memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
+          true
+        else {
+          cluster.join(joinNode)
+          false
+        }
+      }, max, interval)
   }
 
   /**
@@ -243,7 +245,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
   def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
     val members = gotMembers.toIndexedSeq
     members.size should ===(expectedAddresses.length)
-    expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address should ===(a) }
+    expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) => members(i).address should ===(a) }
   }
 
   /**
@@ -269,14 +271,14 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    */
   def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
     if (nodesInCluster.contains(myself)) {
-      nodesInCluster.length should not be (0)
+      nodesInCluster.length should not be 0
       val expectedLeader = roleOfLeader(nodesInCluster)
       val leader = clusterView.leader
       val isLeader = leader == Some(clusterView.selfAddress)
       assert(
         isLeader == isNode(expectedLeader),
         "expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
-      clusterView.status should (be(MemberStatus.Up) or be(MemberStatus.Leaving))
+      clusterView.status should (be(MemberStatus.Up).or(be(MemberStatus.Leaving)))
     }
 
   /**
@@ -284,17 +286,17 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    * Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
    */
   def awaitMembersUp(
-    numberOfMembers:          Int,
-    canNotBePartOfMemberRing: Set[Address]   = Set.empty,
-    timeout:                  FiniteDuration = 25.seconds): Unit = {
+      numberOfMembers: Int,
+      canNotBePartOfMemberRing: Set[Address] = Set.empty,
+      timeout: FiniteDuration = 25.seconds): Unit = {
     within(timeout) {
       if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
-        awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) should not contain (a)))
+        awaitAssert(canNotBePartOfMemberRing.foreach(a => clusterView.members.map(_.address) should not contain a))
       awaitAssert(clusterView.members.size should ===(numberOfMembers))
       awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up)))
       // clusterView.leader is updated by LeaderChanged, await that to be updated also
       val expectedLeader = clusterView.members.collectFirst {
-        case m if m.dataCenter == cluster.settings.SelfDataCenter ⇒ m.address
+        case m if m.dataCenter == cluster.settings.SelfDataCenter => m.address
       }
       awaitAssert(clusterView.leader should ===(expectedLeader))
     }
@@ -307,7 +309,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    * Wait until the specified nodes have seen the same gossip overview.
    */
   def awaitSeenSameState(addresses: Address*): Unit =
-    awaitAssert((addresses.toSet diff clusterView.seenBy) should ===(Set.empty))
+    awaitAssert((addresses.toSet.diff(clusterView.seenBy)) should ===(Set.empty))
 
   /**
    * Leader according to the address ordering of the roles.
@@ -318,7 +320,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
    * be determined from the `RoleName`.
    */
   def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
-    nodesInCluster.length should not be (0)
+    nodesInCluster.length should not be 0
     nodesInCluster.sorted.head
   }
 
@@ -332,7 +334,4 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with FlightRecordi
 
   def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
 
-
 }
-
-
diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
index 0f0fb98..8dcd930 100644
--- a/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/CassandraEventsByTagLoadSpec.scala
@@ -54,7 +54,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
       val readJournal =
         PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
 
-      eventTags.foreach({ tag =>
+      eventTags.foreach { tag =>
         try {
           validateTagStream(readJournal)(tag)
         } catch {
@@ -66,7 +66,7 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
             throw new RuntimeException("Only passed the second time")
         }
 
-      })
+      }
     }
   }
 
@@ -78,14 +78,15 @@ class CassandraEventsByTagLoadSpec extends CassandraSpec(CassandraEventsByTagLoa
     probe.request(messagesPerPersistenceId * nrPersistenceIds)
 
     (1L to (messagesPerPersistenceId * nrPersistenceIds)).foreach { i: Long =>
-      val event = try {
-        probe.expectNext(veryLongWait)
-      } catch {
-        case e: AssertionError =>
-          system.log.error(e, s"Failed to get event: $i")
-          allReceived.filter(_._2.size != messagesPerPersistenceId).foreach(p => system.log.info("{}", p))
-          throw e
-      }
+      val event =
+        try {
+          probe.expectNext(veryLongWait)
+        } catch {
+          case e: AssertionError =>
+            system.log.error(e, s"Failed to get event: $i")
+            allReceived.filter(_._2.size != messagesPerPersistenceId).foreach(p => system.log.info("{}", p))
+            throw e
+        }
 
       allReceived += (event.persistenceId -> (event.sequenceNr :: allReceived(event.persistenceId)))
       var fail = false
diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
index 341aa18..702ee40 100644
--- a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala
@@ -179,8 +179,8 @@ abstract class CassandraSpec(
             .asScala
             .foreach(row => {
               println(s"""Row:${row.getString("tag_name")},${row.getLong("timebucket")},${formatOffset(
-                row.getUuid("timestamp"))},${row.getString("persistence_id")},${row
-                .getLong("tag_pid_sequence_nr")},${row.getLong("sequence_nr")}""")
+                  row.getUuid("timestamp"))},${row.getString("persistence_id")},${row
+                  .getLong("tag_pid_sequence_nr")},${row.getLong("sequence_nr")}""")
 
             })
         }
@@ -190,7 +190,7 @@ abstract class CassandraSpec(
           .asScala
           .foreach(row => {
             println(s"""Row:${row.getLong("partition_nr")}, ${row.getString("persistence_id")}, ${row.getLong(
-              "sequence_nr")}""")
+                "sequence_nr")}""")
           })
 
         println("snapshots")
diff --git a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
index f1c9eb6..23958ba 100644
--- a/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/EventsByTagMigrationSpec.scala
@@ -69,7 +69,7 @@ class EventsByTagMigrationProvidePersistenceIds extends AbstractEventsByTagMigra
     val pidOne = "pOne"
     val pidTwo = "pTwo"
 
-    "support migrating a subset of persistenceIds" taggedAs (RequiresCassandraThree) in {
+    "support migrating a subset of persistenceIds" taggedAs RequiresCassandraThree in {
       writeOldTestEventWithTags(PersistentRepr("e-1", 1, pidOne), Set("blue"))
       writeOldTestEventWithTags(PersistentRepr("e-2", 2, pidOne), Set("blue"))
       writeOldTestEventWithTags(PersistentRepr("f-1", 1, pidTwo), Set("blue"))
@@ -113,7 +113,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
     val pidWithSnapshot = "pidSnapshot"
     val pidExcluded = "pidExcluded"
 
-    "have some existing tagged messages" taggedAs (RequiresCassandraThree) in {
+    "have some existing tagged messages" taggedAs RequiresCassandraThree in {
       // this one uses the 0.7 schema, soo old.
       writeOldTestEventInMessagesColumn(PersistentRepr("e-1", 1L, pidOne), Set("blue", "green", "orange"))
 
@@ -138,7 +138,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       reconciler.rebuildAllPersistenceIds().futureValue
     }
 
-    "allow creation of the new tags view table" taggedAs (RequiresCassandraThree) in {
+    "allow creation of the new tags view table" taggedAs RequiresCassandraThree in {
       migrator.createTables().futureValue shouldEqual Done
     }
 
@@ -146,7 +146,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       migrator.migrateToTagViews(filter = _ != pidExcluded).futureValue shouldEqual Done
     }
 
-    "be idempotent so it can be restarted" taggedAs (RequiresCassandraThree) in {
+    "be idempotent so it can be restarted" taggedAs RequiresCassandraThree in {
       // add some more events to be picked up
       writeOldTestEventWithTags(PersistentRepr("f-1", 1L, pidTwo), Set("green"))
       writeOldTestEventWithTags(PersistentRepr("f-2", 2L, pidTwo), Set("blue"))
@@ -159,11 +159,11 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       reconciler.rebuildAllPersistenceIds().futureValue
     }
 
-    "allow a second migration to resume from where the last one got to" taggedAs (RequiresCassandraThree) in {
+    "allow a second migration to resume from where the last one got to" taggedAs RequiresCassandraThree in {
       migrator.migrateToTagViews(filter = _ != pidExcluded).futureValue shouldEqual Done
     }
 
-    "migrate events missed during the large migration as part of actor recovery" taggedAs (RequiresCassandraThree) in {
+    "migrate events missed during the large migration as part of actor recovery" taggedAs RequiresCassandraThree in {
       // these events mimic the old version still running and persisting events
       writeOldTestEventWithTags(PersistentRepr("f-3", 3L, pidTwo), Set("green"))
       writeOldTestEventWithTags(PersistentRepr("f-4", 4L, pidTwo), Set("blue"))
@@ -172,11 +172,11 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       reconciler.rebuildAllPersistenceIds().futureValue
     }
 
-    "allow adding of the new tags column" taggedAs (RequiresCassandraThree) in {
+    "allow adding of the new tags column" taggedAs RequiresCassandraThree in {
       migrator.addTagsColumn().futureValue shouldEqual Done
     }
 
-    "work with the current implementation" taggedAs (RequiresCassandraThree) in {
+    "work with the current implementation" taggedAs RequiresCassandraThree in {
       val blueSrc: Source[EventEnvelope, NotUsed] = queries.eventsByTag("blue", NoOffset)
       val blueProbe = blueSrc.runWith(TestSink.probe[Any])
       blueProbe.request(5)
@@ -226,7 +226,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       excludedProbe.cancel()
     }
 
-    "see events missed by migration if the persistent actor is started" taggedAs (RequiresCassandraThree) in {
+    "see events missed by migration if the persistent actor is started" taggedAs RequiresCassandraThree in {
       val probe = TestProbe()
       systemTwo.actorOf(TestTaggingActor.props(pidTwo, probe = Some(probe.ref)))
       probe.expectMsg(RecoveryCompleted)
@@ -252,19 +252,19 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
     }
     // This will be left as a manual step for the user as it stops
     // rolling back to the old version
-    "allow dropping of the materialized view" taggedAs (RequiresCassandraThree) in {
+    "allow dropping of the materialized view" taggedAs RequiresCassandraThree in {
       system.log.info("Dropping old materialzied view")
       cluster.execute(SimpleStatement.newInstance(s"DROP MATERIALIZED VIEW $eventsByTagViewName"))
       system.log.info("Dropped old materialzied view")
     }
 
-    "have a peek in the messages table" taggedAs (RequiresCassandraThree) in {
+    "have a peek in the messages table" taggedAs RequiresCassandraThree in {
       val row = cluster.execute(SimpleStatement.newInstance(s"select * from ${messagesTableName} limit 1")).one()
       system.log.debug("New messages table looks like: {}", row)
       system.log.debug("{}", row.getColumnDefinitions)
     }
 
-    "be able to add tags to existing pids" taggedAs (RequiresCassandraThree) in {
+    "be able to add tags to existing pids" taggedAs RequiresCassandraThree in {
       // we need a new actor system for this as the old one will have prepared the statements without
       // the tags column existing
       val pidOnePA = systemTwo.actorOf(TestTaggingActor.props(pidOne, Set("blue", "yellow")))
@@ -283,7 +283,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       blueProbe.expectNextPF {
         case EventEnvelope(_, `pidWithMeta`, 1, "g-1") =>
       }
-      blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 4, "f-4")         => }
+      blueProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 4, "f-4") => }
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 5, "new-event-1") => }
       blueProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 6, "new-event-2") => }
       blueProbe.expectNoMessage(waitTime)
@@ -293,13 +293,13 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
 
     // Again a manual step, leaving them is only wasting disk space
     // the new version will work with these columns still there
-    "allow dropping of tag columns" taggedAs (RequiresCassandraThree) in {
+    "allow dropping of tag columns" taggedAs RequiresCassandraThree in {
       cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag1")
       cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag2")
       cluster.execute(s"ALTER TABLE ${messagesTableName} DROP tag3")
     }
 
-    "still work after dropping the tag columns" taggedAs (RequiresCassandraThree) in {
+    "still work after dropping the tag columns" taggedAs RequiresCassandraThree in {
       val pidTwoPA = systemThree.actorOf(TestTaggingActor.props(pidTwo, Set("orange")))
       pidTwoPA ! "new-event-1"
       expectMsg(Ack)
@@ -309,7 +309,7 @@ class EventsByTagMigrationSpec extends AbstractEventsByTagMigrationSpec {
       val orangeSrc: Source[EventEnvelope, NotUsed] = queriesThree.eventsByTag("orange", NoOffset)
       val orangeProbe = orangeSrc.runWith(TestSink.probe[Any])(SystemMaterializer(systemThree).materializer)
       orangeProbe.request(3)
-      orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1")         => }
+      orangeProbe.expectNextPF { case EventEnvelope(_, `pidOne`, 1, "e-1") => }
       orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 5, "new-event-1") => }
       orangeProbe.expectNextPF { case EventEnvelope(_, `pidTwo`, 6, "new-event-2") => }
       orangeProbe.expectNoMessage(waitTime)
diff --git a/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala b/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
index be88d4d..3241db6 100644
--- a/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/RetriesSpec.scala
@@ -29,9 +29,9 @@ class RetriesSpec
       @volatile var called = 0
       val result = Retries
         .retry(() => {
-          called += 1
-          Future.failed(new RuntimeException(s"cats $called"))
-        }, 3, (_, exc, _) => failProbe.ref ! exc, 1.milli, 2.millis, 0.1)
+            called += 1
+            Future.failed(new RuntimeException(s"cats $called"))
+          }, 3, (_, exc, _) => failProbe.ref ! exc, 1.milli, 2.millis, 0.1)
         .failed
         .futureValue
       called shouldEqual 3
diff --git a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
index 40d85b8..7cc691e 100644
--- a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala
@@ -96,7 +96,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       cleanup.deleteAllEvents(pid, neverUsePersistenceIdAgain = true).futureValue
 
       // also delete from all_persistence_ids
-      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain (pid)
+      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain pid
 
       val p2 = system.actorOf(TestActor.props(pid))
       p2 ! GetRecoveredState
@@ -192,7 +192,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       cleanup.deleteAll(pid, neverUsePersistenceIdAgain = true).futureValue
 
       // also delete from all_persistence_ids
-      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain (pid)
+      queries.currentPersistenceIds().runWith(Sink.seq).futureValue should not contain pid
 
       val p2 = system.actorOf(TestActor.props(pid))
       p2 ! GetRecoveredState
@@ -202,7 +202,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       queries.currentEventsByTag(tag = "tag-b", offset = NoOffset).runWith(Sink.seq).futureValue.size should ===(0)
     }
 
-    "delete some for one persistenceId" taggedAs (RequiresCassandraThree) in {
+    "delete some for one persistenceId" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 8).foreach { i =>
@@ -219,7 +219,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       expectMsg(RecoveredState("", List("evt-6", "evt-7", "evt-8"), 8L))
     }
 
-    "clean up before latest snapshot for one persistence id" taggedAs (RequiresCassandraThree) in {
+    "clean up before latest snapshot for one persistence id" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 3).foreach { i =>
@@ -259,7 +259,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
         .futureValue shouldEqual List("evt-7", "evt-8", "evt-9")
     }
 
-    "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs (RequiresCassandraThree) in {
+    "clean up before snapshot including timestamp that results in all events kept for one persistence id" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 3).foreach { i =>
@@ -299,7 +299,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
         .futureValue shouldEqual List("evt-4", "evt-5", "evt-6", "evt-7", "evt-8", "evt-9")
     }
 
-    "clean up before snapshot including timestamp for one persistence id" taggedAs (RequiresCassandraThree) in {
+    "clean up before snapshot including timestamp for one persistence id" taggedAs RequiresCassandraThree in {
       val pid = nextPid
       val p = system.actorOf(TestActor.props(pid))
       (1 to 3).foreach { i =>
@@ -440,7 +440,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
   }
 
   "Time and snapshot based cleanup" must {
-    "keep the correct  number of snapshots" taggedAs (RequiresCassandraThree) in {
+    "keep the correct  number of snapshots" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -457,7 +457,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
 
       oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000))
     }
-    "keep the all snapshots if fewer than requested without timestamp" taggedAs (RequiresCassandraThree) in {
+    "keep the all snapshots if fewer than requested without timestamp" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -477,7 +477,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
 
       oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 1, 1000))
     }
-    "keep the all snapshots if fewer than requested with timestamp" taggedAs (RequiresCassandraThree) in {
+    "keep the all snapshots if fewer than requested with timestamp" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -512,7 +512,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
       oldestSnapshot shouldEqual None
     }
 
-    "don't delete snapshots newer than the oldest date" taggedAs (RequiresCassandraThree) in {
+    "don't delete snapshots newer than the oldest date" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
@@ -534,7 +534,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting {
 
       oldestSnapshot shouldEqual Some(SnapshotMetadata(pid, 2, 2000))
     }
-    "keep snapshots older than the oldest date to meet snapshotsToKeep" taggedAs (RequiresCassandraThree) in {
+    "keep snapshots older than the oldest date to meet snapshotsToKeep" taggedAs RequiresCassandraThree in {
       val cleanup = new Cleanup(system)
       val pid = nextPid
       writeTestSnapshot(SnapshotMetadata(pid, 1, 1000), "snapshot-1").futureValue
diff --git a/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala b/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
index 3b54095..dd728c3 100644
--- a/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/compaction/CassandraCompactionStrategySpec.scala
@@ -47,7 +47,7 @@ class CassandraCompactionStrategySpec
       compactionStrategy.compactionWindowUnit shouldEqual TimeUnit.DAYS
     }
 
-    "successfully create CQL from TimeWindowCompactionStrategy" taggedAs (RequiresCassandraThree) in {
+    "successfully create CQL from TimeWindowCompactionStrategy" taggedAs RequiresCassandraThree in {
       val twConfig = ConfigFactory.parseString("""journal.table-compaction-strategy {
           | class = "TimeWindowCompactionStrategy"
           | compaction_window_size = 1
@@ -57,7 +57,7 @@ class CassandraCompactionStrategySpec
 
       val cqlExpression =
         s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable1 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
-          twConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+            twConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
 
       noException must be thrownBy {
         cluster.execute(cqlExpression)
@@ -112,7 +112,7 @@ class CassandraCompactionStrategySpec
 
       val cqlExpression =
         s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable2 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
-          uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+            uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
 
       noException must be thrownBy {
         cluster.execute(cqlExpression)
@@ -165,7 +165,7 @@ class CassandraCompactionStrategySpec
 
       val cqlExpression =
         s"CREATE TABLE IF NOT EXISTS testKeyspace.testTable3 (testId TEXT PRIMARY KEY) WITH compaction = ${CassandraCompactionStrategy(
-          uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
+            uniqueConfig.getConfig("journal.table-compaction-strategy")).asCQL}"
 
       noException must be thrownBy {
         cluster.execute(cqlExpression)
diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
index 254452e..1eab4c1 100644
--- a/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/query/EventAdaptersReadSpec.scala
@@ -59,10 +59,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
   "Cassandra query EventsByPersistenceId" must {
 
     "not replay dropped events by the event-adapter" in {
-      setup("a", 6, {
-        case x if x % 2 == 0 => "dropped:"
-        case _               => ""
-      })
+      setup("a", 6,
+        {
+          case x if x % 2 == 0 => "dropped:"
+          case _               => ""
+        })
 
       val src = queries.currentEventsByPersistenceId("a", 0L, Long.MaxValue)
       src
@@ -78,10 +79,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
 
     "replay duplicate events by the event-adapter" in {
 
-      setup("b", 3, {
-        case x if x % 2 == 0 => "duplicated:"
-        case _               => ""
-      })
+      setup("b", 3,
+        {
+          case x if x % 2 == 0 => "duplicated:"
+          case _               => ""
+        })
 
       val src = queries.currentEventsByPersistenceId("b", 0L, Long.MaxValue)
       src.map(_.event).runWith(TestSink.probe[Any]).request(10).expectNext("b-1", "b-2", "b-2", "b-3").expectComplete()
@@ -100,10 +102,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
   "Cassandra query EventsByTag" must {
     "not replay events dropped by the event-adapter" in {
 
-      setup("d", 6, tagged("red") {
-        case x if x % 2 == 0 => "dropped:"
-        case _               => ""
-      })
+      setup("d", 6,
+        tagged("red") {
+          case x if x % 2 == 0 => "dropped:"
+          case _               => ""
+        })
 
       val src = queries.eventsByTag("red", NoOffset)
       val sub = src.map(_.event).runWith(TestSink.probe[Any])
@@ -115,10 +118,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
 
     "replay events duplicated by the event-adapter" in {
 
-      setup("e", 3, tagged("yellow") {
-        case x if x % 2 == 0 => "duplicated:"
-        case _               => ""
-      })
+      setup("e", 3,
+        tagged("yellow") {
+          case x if x % 2 == 0 => "duplicated:"
+          case _               => ""
+        })
 
       val src = queries.eventsByTag("yellow", NoOffset)
       val sub = src.map(_.event).runWith(TestSink.probe[Any])
@@ -129,10 +133,11 @@ class EventAdaptersReadSpec extends CassandraSpec(EventAdaptersReadSpec.config)
 
     "replay events transformed by the event-adapter" in {
 
-      setup("e", 3, tagged("green") {
-        case x if x % 2 == 0 => "prefixed:foo:"
-        case _               => ""
-      })
+      setup("e", 3,
+        tagged("green") {
+          case x if x % 2 == 0 => "prefixed:foo:"
+          case _               => ""
+        })
 
       val src = queries.eventsByTag("green", NoOffset)
       val sub = src.map(_.event).runWith(TestSink.probe[Any])
diff --git a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
index c37cdc8..92c20ed 100644
--- a/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/query/EventsByTagSpec.scala
@@ -200,7 +200,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
       val probe = greenSrc.runWith(TestSink.probe[Any])
       probe.request(2)
-      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple")  => e }
+      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
       probe.expectNoMessage(500.millis)
       probe.request(2)
@@ -233,7 +233,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val greenSrc = queries.currentEventsByTag(tag = "green", offset = NoOffset)
       val probe = greenSrc.runWith(TestSink.probe[Any])
       probe.request(2)
-      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple")  => e }
+      probe.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
       probe.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
       probe.expectNoMessage(waitTime)
 
@@ -274,7 +274,7 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       if (appleTimestamp == bananaTimestamp)
         probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
       probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
-      probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf")   => e }
+      probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
       probe2.cancel()
     }
 
@@ -349,22 +349,23 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
 
     "find new events" in {
       val d = system.actorOf(TestActor.props("d"))
-      withProbe(queries.eventsByTag(tag = "black", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(2)
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car") => e }
-        probe.expectNoMessage(waitTime)
+      withProbe(queries.eventsByTag(tag = "black", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(2)
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "a black car") => e }
+          probe.expectNoMessage(waitTime)
 
-        d ! "a black dog"
-        expectMsg(s"a black dog-done")
-        d ! "a black night"
-        expectMsg(s"a black night-done")
+          d ! "a black dog"
+          expectMsg(s"a black dog-done")
+          d ! "a black night"
+          expectMsg(s"a black night-done")
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "d", 1L, "a black dog") => e }
-        probe.expectNoMessage(waitTime)
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "d", 2L, "a black night") => e }
-        probe.cancel()
-      })
+          probe.expectNextPF { case e @ EventEnvelope(_, "d", 1L, "a black dog") => e }
+          probe.expectNoMessage(waitTime)
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "d", 2L, "a black night") => e }
+          probe.cancel()
+        })
     }
 
     "find events from timestamp offset" in {
@@ -394,8 +395,8 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
               probe2.request(10)
               if (appleTimestamp == bananaTimestamp)
                 probe2.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
-              probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana")   => e }
-              probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf")     => e }
+              probe2.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }
+              probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
               probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
               probe2.expectNoMessage(waitTime)
             })
@@ -404,19 +405,20 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
     }
 
     "find events from UUID offset " in {
-      withProbe(queries.eventsByTag(tag = "green", offset = NoOffset).runWith(TestSink.probe[Any]), probe1 => {
-        probe1.request(2)
-        probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
-        val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }.offset
-        probe1.cancel()
-
-        val greenSrc2 = queries.eventsByTag(tag = "green", offs)
-        val probe2 = greenSrc2.runWith(TestSink.probe[Any])
-        probe2.request(10)
-        probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf")     => e }
-        probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
-        probe2.expectNoMessage(waitTime)
-      })
+      withProbe(queries.eventsByTag(tag = "green", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe1 => {
+          probe1.request(2)
+          probe1.expectNextPF { case e @ EventEnvelope(_, "a", 2L, "a green apple") => e }
+          val offs = probe1.expectNextPF { case e @ EventEnvelope(_, "a", 4L, "a green banana") => e }.offset
+          probe1.cancel()
+
+          val greenSrc2 = queries.eventsByTag(tag = "green", offs)
+          val probe2 = greenSrc2.runWith(TestSink.probe[Any])
+          probe2.request(10)
+          probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "a green leaf") => e }
+          probe2.expectNextPF { case e @ EventEnvelope(_, "c", 1L, "a green cucumber") => e }
+          probe2.expectNoMessage(waitTime)
+        })
     }
 
     "include timestamp in EventEnvelope" in {
@@ -449,21 +451,22 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val pr2 = PersistentRepr("e2", 2L, "p1", "", writerUuid = w1)
       writeTaggedEvent(t2, pr2, Set("T1-live"), 2, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T1-live", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
-
-        val t3 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
-        val pr3 = PersistentRepr("e3", 3L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t3, pr3, Set("T1-live"), 3, bucketSize)
-        val t4 = LocalDateTime.now(ZoneOffset.UTC)
-        val pr4 = PersistentRepr("e4", 4L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t4, pr4, Set("T1-live"), 4, bucketSize)
-
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "e3") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 4L, "e4") => e }
-      })
+      withProbe(queries.eventsByTag(tag = "T1-live", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "e1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "e2") => e }
+
+          val t3 = LocalDateTime.now(ZoneOffset.UTC).minusMinutes(5)
+          val pr3 = PersistentRepr("e3", 3L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t3, pr3, Set("T1-live"), 3, bucketSize)
+          val t4 = LocalDateTime.now(ZoneOffset.UTC)
+          val pr4 = PersistentRepr("e4", 4L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t4, pr4, Set("T1-live"), 4, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "e3") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 4L, "e4") => e }
+        })
     }
 
     "sort events by timestamp" in {
@@ -476,21 +479,22 @@ class EventsByTagSpec extends AbstractEventsByTagSpec(EventsByTagSpec.config) {
       val pr3 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
       writeTaggedEvent(t3, pr3, Set("T2"), 2, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T2", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-
-        // simulate async eventually consistent Materialized View update
-        // that cause p1-e2 to show up before p2-e1
-        Thread.sleep(500)
-        val t2 = t3.minus(1, ChronoUnit.MILLIS)
-        val pr2 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
-        writeTaggedEvent(t2, pr2, Set("T2"), 1, bucketSize)
+      withProbe(queries.eventsByTag(tag = "T2", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
-        val e3 = probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
-        (System.currentTimeMillis() - e3.timestamp) should be < 10000L
-      })
+          // simulate async eventually consistent Materialized View update
+          // that cause p1-e2 to show up before p2-e1
+          Thread.sleep(500)
+          val t2 = t3.minus(1, ChronoUnit.MILLIS)
+          val pr2 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
+          writeTaggedEvent(t2, pr2, Set("T2"), 1, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
+          val e3 = probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
+          (System.currentTimeMillis() - e3.timestamp) should be < 10000L
+        })
     }
 
     "stream many events" in {
@@ -604,21 +608,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val p2e1 = PersistentRepr("p2-e1", 1L, "p2", "", writerUuid = w2)
       writeTaggedEvent(t2, p2e1, Set("T6"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T6", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
-
-        // delayed, and timestamp is before p2-e1
-        val t3 = t1.plusSeconds(1)
-        val p1e2 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t3, p1e2, Set("T6"), 2, bucketSize)
-        val p1e3 = PersistentRepr("p1-e3", 3L, "p1", "", writerUuid = w1)
-        writeTaggedEvent(t2.plusSeconds(1), p1e3, Set("T6"), 3, bucketSize)
-
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "p1-e3") => e }
-      })
+      withProbe(queries.eventsByTag(tag = "T6", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 1L, "p1-e1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p2", 1L, "p2-e1") => e }
+
+          // delayed, and timestamp is before p2-e1
+          val t3 = t1.plusSeconds(1)
+          val p1e2 = PersistentRepr("p1-e2", 2L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t3, p1e2, Set("T6"), 2, bucketSize)
+          val p1e3 = PersistentRepr("p1-e3", 3L, "p1", "", writerUuid = w1)
+          writeTaggedEvent(t2.plusSeconds(1), p1e3, Set("T6"), 3, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 2L, "p1-e2") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "p1", 3L, "p1-e3") => e }
+        })
     }
 
     "find delayed events 2" in {
@@ -630,21 +635,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t2, eventA1, Set("T7"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T7", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
+      withProbe(queries.eventsByTag(tag = "T7", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
 
-        // delayed, timestamp is before A1
-        val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t1, eventB1, Set("T7"), 1, bucketSize)
-        // second delayed is after A1 so should be found and trigger a search for B1
-        val t3 = t1.plusSeconds(2)
-        val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t3, eventB2, Set("T7"), 2, bucketSize)
+          // delayed, timestamp is before A1
+          val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t1, eventB1, Set("T7"), 1, bucketSize)
+          // second delayed is after A1 so should be found and trigger a search for B1
+          val t3 = t1.plusSeconds(2)
+          val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t3, eventB2, Set("T7"), 2, bucketSize)
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e } // failed in travis
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
-      })
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e } // failed in travis
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+        })
     }
 
     "find delayed events 3" in {
@@ -659,21 +665,22 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t2, eventA1, Set("T8"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T8", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(10)
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
-
-        // delayed, timestamp is before A1
-        val eventB1 = PersistentRepr("B1", 2L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t1, eventB1, Set("T8"), 2, bucketSize)
-        val t3 = t1.plusSeconds(2)
-        val eventB2 = PersistentRepr("B2", 3L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t3, eventB2, Set("T8"), 3, bucketSize)
-
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B1") => e }
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 3L, "B2") => e }
-      })
+      withProbe(queries.eventsByTag(tag = "T8", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(10)
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B0") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }
+
+          // delayed, timestamp is before A1
+          val eventB1 = PersistentRepr("B1", 2L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t1, eventB1, Set("T8"), 2, bucketSize)
+          val t3 = t1.plusSeconds(2)
+          val eventB2 = PersistentRepr("B2", 3L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t3, eventB2, Set("T8"), 3, bucketSize)
+
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 3L, "B2") => e }
+        })
     }
 
     "find delayed events from offset" in {
@@ -684,25 +691,27 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
       val eventA1 = PersistentRepr("A1", 1L, "a", "", writerUuid = w1)
       writeTaggedEvent(t1.plusSeconds(2), eventA1, Set("T9"), 1, bucketSize)
 
-      withProbe(queries.eventsByTag(tag = "T9", offset = NoOffset).runWith(TestSink.probe[Any]), probe1 => {
-        probe1.request(10)
-        val offs =
-          probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }.offset.asInstanceOf[TimeBasedUUID]
+      withProbe(queries.eventsByTag(tag = "T9", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe1 => {
+          probe1.request(10)
+          val offs =
+            probe1.expectNextPF { case e @ EventEnvelope(_, "a", 1L, "A1") => e }.offset.asInstanceOf[TimeBasedUUID]
 
-        withProbe(queries.eventsByTag(tag = "T9", offset = offs).runWith(TestSink.probe[Any]), probe2 => {
-          probe2.request(10)
+          withProbe(queries.eventsByTag(tag = "T9", offset = offs).runWith(TestSink.probe[Any]),
+            probe2 => {
+              probe2.request(10)
 
-          // delayed, timestamp is before A1, i.e. before the offset so should not be picked up
-          val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
-          writeTaggedEvent(t1.plusSeconds(1), eventB1, Set("T9"), 1, bucketSize)
+              // delayed, timestamp is before A1, i.e. before the offset so should not be picked up
+              val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+              writeTaggedEvent(t1.plusSeconds(1), eventB1, Set("T9"), 1, bucketSize)
 
-          // delayed, timestamp is after A1 so should be picked up
-          val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
-          writeTaggedEvent(t1.plusSeconds(3), eventB2, Set("T9"), 2, bucketSize)
+              // delayed, timestamp is after A1 so should be picked up
+              val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+              writeTaggedEvent(t1.plusSeconds(3), eventB2, Set("T9"), 2, bucketSize)
 
-          probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+              probe2.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+            })
         })
-      })
     }
 
     // Not supported atm as it requires us to back track without seeing a future event
@@ -717,29 +726,30 @@ akka.persistence.cassandra.events-by-tag.refresh-internal = 100ms
         writeTaggedEvent(t1.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
       }
 
-      withProbe(queries.eventsByTag(tag = "T10", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(1000)
-        probe.expectNextN(100)
+      withProbe(queries.eventsByTag(tag = "T10", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(1000)
+          probe.expectNextN(100)
 
-        val t2 = t1.plusSeconds(1)
-        (101L to 200L).foreach { n =>
-          val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1)
-          writeTaggedEvent(t2.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
-        }
+          val t2 = t1.plusSeconds(1)
+          (101L to 200L).foreach { n =>
+            val eventA = PersistentRepr(s"A$n", n, "a", "", writerUuid = w1)
+            writeTaggedEvent(t2.plus(n, ChronoUnit.MILLIS), eventA, Set("T10"), n, bucketSize)
+          }
 
-        // delayed, timestamp is before A101 but after A100
-        val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t2.minus(100, ChronoUnit.MILLIS), eventB1, Set("T10"), 1, bucketSize)
+          // delayed, timestamp is before A101 but after A100
+          val eventB1 = PersistentRepr("B1", 1L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t2.minus(100, ChronoUnit.MILLIS), eventB1, Set("T10"), 1, bucketSize)
 
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e }
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 1L, "B1") => e }
 
-        // Now A101 - A200 can be delivered
-        probe.expectNextN(100)
+          // Now A101 - A200 can be delivered
+          probe.expectNextN(100)
 
-        val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
-        writeTaggedEvent(t2.plusSeconds(1), eventB2, Set("T10"), 2, bucketSize)
-        probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
-      })
+          val eventB2 = PersistentRepr("B2", 2L, "b", "", writerUuid = w2)
+          writeTaggedEvent(t2.plusSeconds(1), eventB2, Set("T10"), 2, bucketSize)
+          probe.expectNextPF { case e @ EventEnvelope(_, "b", 2L, "B2") => e }
+        })
     }
 
     "find events from many persistenceIds" in {
@@ -815,10 +825,11 @@ class EventsByTagStrictBySeqNoEarlyFirstOffsetSpec
 
       // the search for delayed events should start before we get to the current timebucket
       // until 0.26/0.51 backtracking was broken and events would be skipped
-      withProbe(queries.eventsByTag(tag = "T11", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(2000)
-        probe.expectNextN(2000)
-      })
+      withProbe(queries.eventsByTag(tag = "T11", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(2000)
+          probe.expectNextN(2000)
+        })
     }
   }
 }
@@ -851,27 +862,30 @@ class EventsByTagLongRefreshIntervalSpec
     sender.expectNoMessage(200.millis) // try and give time for the tagged event to be flushed so the query doesn't need to wait for the refresh interval
 
     val offset: Offset =
-      withProbe(queries.eventsByTag(tag = "animal", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
-        probe.request(2)
-        probe.expectNextPF {
-          case EventEnvelope(offset, `pid`, 1L, "cat") =>
-            offset
-        }
-      })
+      withProbe(queries.eventsByTag(tag = "animal", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
+          probe.request(2)
+          probe.expectNextPF {
+            case EventEnvelope(offset, `pid`, 1L, "cat") =>
+              offset
+          }
+        })
 
     pa.tell(Tagged("cat2", Set("animal")), sender.ref)
     sender.expectMsg("cat2-done")
     // flush interval for tag writes is 0ms but still give some time for the tag write to complete
     sender.expectNoMessage(250.millis)
 
-    withProbe(queries.eventsByTag(tag = "animal", offset = offset).runWith(TestSink.probe[Any]), probe => {
-      probe.request(2)
-      // less than the refresh interval, previously this would evaluate the new persistence-id timeout and then not re-evaluate
-      // it again until the next refresh interval
-      probe.expectNextWithTimeoutPF(2.seconds, {
-        case EventEnvelope(_, `pid`, 2L, "cat2") =>
+    withProbe(queries.eventsByTag(tag = "animal", offset = offset).runWith(TestSink.probe[Any]),
+      probe => {
+        probe.request(2)
+        // less than the refresh interval, previously this would evaluate the new persistence-id timeout and then not re-evaluate
+        // it again until the next refresh interval
+        probe.expectNextWithTimeoutPF(2.seconds,
+          {
+            case EventEnvelope(_, `pid`, 2L, "cat2") =>
+          })
       })
-    })
   }
 }
 
@@ -1011,29 +1025,30 @@ class EventsByTagStrictBySeqMemoryIssueSpec extends AbstractEventsByTagSpec(Even
           writeTaggedEvent(t2, eventB, Set("T14"), n - 112, bucketSize)
       }
 
-      withProbe(queries.eventsByTag(tag = "T14", offset = NoOffset).runWith(TestSink.probe[Any]), probe => {
+      withProbe(queries.eventsByTag(tag = "T14", offset = NoOffset).runWith(TestSink.probe[Any]),
+        probe => {
 
-        val requested1 = 130L
-        probe.request(requested1)
-        val expected1 = 100L + 12 * 2
-        probe.expectNextN(expected1)
-        probe.expectNoMessage(2.seconds)
+          val requested1 = 130L
+          probe.request(requested1)
+          val expected1 = 100L + 12 * 2
+          probe.expectNextN(expected1)
+          probe.expectNoMessage(2.seconds)
 
-        system.log.debug("writing missing event, 113, and a bunch of delayed from C")
-        (1L to 100L).foreach { n =>
-          val eventC = PersistentRepr(s"C$n", n, "c", "", writerUuid = w3)
-          val t = t1.plus(3 * n + 2, ChronoUnit.MILLIS)
-          writeTaggedEvent(t, eventC, Set("T14"), n, bucketSize)
-        }
-        writeTaggedEvent(missingEventTime, missingEvent, Set("T14"), 101, bucketSize)
-        val expected2 = requested1 - expected1
-        probe.expectNextN(expected2)
-        probe.expectNoMessage(200.millis)
-
-        probe.request(1000)
-        probe.expectNextN(8 + 100 - expected2)
-        probe.expectNoMessage(200.millis)
-      })
+          system.log.debug("writing missing event, 113, and a bunch of delayed from C")
+          (1L to 100L).foreach { n =>
+            val eventC = PersistentRepr(s"C$n", n, "c", "", writerUuid = w3)
+            val t = t1.plus(3 * n + 2, ChronoUnit.MILLIS)
+            writeTaggedEvent(t, eventC, Set("T14"), n, bucketSize)
+          }
+          writeTaggedEvent(missingEventTime, missingEvent, Set("T14"), 101, bucketSize)
+          val expected2 = requested1 - expected1
+          probe.expectNextN(expected2)
+          probe.expectNoMessage(200.millis)
+
+          probe.request(1000)
+          probe.expectNextN(8 + 100 - expected2)
+          probe.expectNoMessage(200.millis)
+        })
     }
 
     "find all events" in {
diff --git a/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala b/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
index 487f409..df7c4a2 100644
--- a/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
+++ b/core/src/test/scala/akka/persistence/cassandra/testkit/CassandraLauncherSpec.scala
@@ -52,8 +52,8 @@ class CassandraLauncherSpec
         CassandraLauncher.classpathForResources("logback-test.xml"))
 
       awaitAssert({
-        testCassandra()
-      }, 45.seconds)
+          testCassandra()
+        }, 45.seconds)
 
       CassandraLauncher.stop()
 
diff --git a/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala b/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
index a7c7a72..913950c 100644
--- a/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
+++ b/core/src/test/scala/doc/reconciler/AllPersistenceIdsMigrationCompileOnly.scala
@@ -15,7 +15,7 @@ import akka.persistence.cassandra.reconciler.Reconciliation
 
 class AllPersistenceIdsMigrationCompileOnly {
 
-  //#migrate
+  // #migrate
   // System should have the same Cassandra plugin configuration as your application
   // but be careful to remove seed nodes so this doesn't join the cluster
   val system = ActorSystem()
@@ -32,5 +32,5 @@ class AllPersistenceIdsMigrationCompileOnly {
       system.log.error(e, "All persistenceIds migration failed.")
       system.terminate()
   }
-  //#migrate
+  // #migrate
 }
diff --git a/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala b/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
index af8b63b..8c321bd 100644
--- a/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
+++ b/core/src/test/scala/doc/reconciler/ReconciliationCompileOnly.scala
@@ -14,7 +14,7 @@ import akka.Done
 
 class ReconciliationCompileOnly {
 
-  //#reconcile
+  // #reconcile
   // System should have the same Cassandra plugin configuration as your application
   // but be careful to remove seed nodes so this doesn't join the cluster
   val system = ActorSystem()
@@ -32,5 +32,5 @@ class ReconciliationCompileOnly {
     // optional: re-build, if this is ommited then it will be re-build next time the pid is started
     _ <- rec.rebuildTagViewForPersistenceIds(pid)
   } yield Done
-  //#reconcile
+  // #reconcile
 }
diff --git a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
index 1a68ec9..103d25d 100644
--- a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
+++ b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
@@ -12,7 +12,7 @@ object CleanupDocExample {
 
   implicit val system: ActorSystem = ???
 
-  //#cleanup
+  // #cleanup
   val queries = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
   val cleanup = new Cleanup(system)
 
@@ -31,6 +31,6 @@ object CleanupDocExample {
     .mapAsync(persistenceIdParallelism)(pid => cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant.toEpochMilli))
     .run()
 
-  //#cleanup
+  // #cleanup
 
 }
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
index 2ea8d58..a50e3e6 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala
@@ -38,8 +38,8 @@ class EventProcessorStream[Event: ClassTag](
           readOffset().map { offset =>
             log.infoN("Starting stream for tag [{}] from offset [{}]", tag, offset)
             processEventsByTag(offset, histogram)
-            // groupedWithin can be used here to improve performance by reducing number of offset writes,
-            // with the trade-off of possibility of more duplicate events when stream is restarted
+              // groupedWithin can be used here to improve performance by reducing number of offset writes,
+              // with the trade-off of possibility of more duplicate events when stream is restarted
               .mapAsync(1)(writeOffset)
           }
         }
@@ -52,21 +52,21 @@ class EventProcessorStream[Event: ClassTag](
     query.eventsByTag(tag, offset).mapAsync(1) { eventEnvelope =>
       eventEnvelope.event match {
         case event: Event => {
-          // Times from different nodes, take with a pinch of salt
-          val latency = System.currentTimeMillis() - eventEnvelope.timestamp
-          // when restarting without the offset the latency will be too big
-          if (latency < histogram.getMaxValue) {
-            histogram.recordValue(latency)
-          }
-          log.debugN(
-            "Tag {} Event {} persistenceId {}, sequenceNr {}. Latency {}",
-            tag,
-            event,
-            PersistenceId.ofUniqueId(eventEnvelope.persistenceId),
-            eventEnvelope.sequenceNr,
-            latency)
-          Future.successful(Done)
-        }.map(_ => eventEnvelope.offset)
+            // Times from different nodes, take with a pinch of salt
+            val latency = System.currentTimeMillis() - eventEnvelope.timestamp
+            // when restarting without the offset the latency will be too big
+            if (latency < histogram.getMaxValue) {
+              histogram.recordValue(latency)
+            }
+            log.debugN(
+              "Tag {} Event {} persistenceId {}, sequenceNr {}. Latency {}",
+              tag,
+              event,
+              PersistenceId.ofUniqueId(eventEnvelope.persistenceId),
+              eventEnvelope.sequenceNr,
+              latency)
+            Future.successful(Done)
+          }.map(_ => eventEnvelope.offset)
         case other =>
           Future.failed(new IllegalArgumentException(s"Unexpected event [${other.getClass.getName}]"))
       }
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
index 8809898..183b90f 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala
@@ -18,22 +18,22 @@ object Main {
   def main(args: Array[String]): Unit = {
 
     ActorSystem(Behaviors.setup[SelfUp] {
-      ctx =>
-        val readSettings = ReadSide.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
-        val writeSettings = ConfigurablePersistentActor.Settings(readSettings.nrTags)
-        val loadSettings = LoadGenerator.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
+        ctx =>
+          val readSettings = ReadSide.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
+          val writeSettings = ConfigurablePersistentActor.Settings(readSettings.nrTags)
+          val loadSettings = LoadGenerator.Settings(ctx.system.settings.config.getConfig("cassandra.example"))
 
-        AkkaManagement(ctx.system).start()
-        ClusterBootstrap(ctx.system).start()
-        val cluster = Cluster(ctx.system)
-        cluster.subscriptions ! Subscribe(ctx.self, classOf[SelfUp])
+          AkkaManagement(ctx.system).start()
+          ClusterBootstrap(ctx.system).start()
+          val cluster = Cluster(ctx.system)
+          cluster.subscriptions ! Subscribe(ctx.self, classOf[SelfUp])
 
-        val topic = ReadSideTopic.init(ctx)
+          val topic = ReadSideTopic.init(ctx)
 
-        if (cluster.selfMember.hasRole("read")) {
-          val session = CassandraSessionRegistry(ctx.system).sessionFor("akka.persistence.cassandra")
-          val offsetTableStmt =
-            """
+          if (cluster.selfMember.hasRole("read")) {
+            val session = CassandraSessionRegistry(ctx.system).sessionFor("akka.persistence.cassandra")
+            val offsetTableStmt =
+              """
               CREATE TABLE IF NOT EXISTS akka.offsetStore (
                 eventProcessorId text,
                 tag text,
@@ -42,27 +42,27 @@ object Main {
               )
            """
 
-          Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
-        }
+            Await.ready(session.executeDDL(offsetTableStmt), 30.seconds)
+          }
 
-        Behaviors.receiveMessage {
-          case SelfUp(state) =>
-            ctx.log.infoN(
-              "Cluster member joined. Initializing persistent actors. Roles {}. Members {}",
-              cluster.selfMember.roles,
-              state.members)
-            val ref = ConfigurablePersistentActor.init(writeSettings, ctx.system)
-            if (cluster.selfMember.hasRole("read")) {
-              ctx.spawnAnonymous(Reporter(topic))
-            }
-            ReadSide(ctx.system, topic, readSettings)
-            if (cluster.selfMember.hasRole("load")) {
-              ctx.log.info("Starting load generation")
-              val load = ctx.spawn(LoadGenerator(loadSettings, ref), "load-generator")
-              load ! Start(10.seconds)
-            }
-            Behaviors.empty
-        }
-    }, "apc-example")
+          Behaviors.receiveMessage {
+            case SelfUp(state) =>
+              ctx.log.infoN(
+                "Cluster member joined. Initializing persistent actors. Roles {}. Members {}",
+                cluster.selfMember.roles,
+                state.members)
+              val ref = ConfigurablePersistentActor.init(writeSettings, ctx.system)
+              if (cluster.selfMember.hasRole("read")) {
+                ctx.spawnAnonymous(Reporter(topic))
+              }
+              ReadSide(ctx.system, topic, readSettings)
+              if (cluster.selfMember.hasRole("load")) {
+                ctx.log.info("Starting load generation")
+                val load = ctx.spawn(LoadGenerator(loadSettings, ref), "load-generator")
+                load ! Start(10.seconds)
+              }
+              Behaviors.empty
+          }
+      }, "apc-example")
   }
 }
diff --git a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
index 91ca0d8..81f64d4 100644
--- a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
+++ b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala
@@ -47,7 +47,7 @@ object ReadSide {
     Behaviors.withTimers { timers =>
       timers.startTimerAtFixedRate(ReportMetrics, 10.second)
       Behaviors.setup { ctx =>
-        val start = (settings.tagsPerProcessor * nr)
+        val start = settings.tagsPerProcessor * nr
         val end = start + (settings.tagsPerProcessor) - 1
         val tags = (start to end).map(i => s"tag-$i")
         ctx.log.info("Processor {} processing tags {}", nr, tags)
@@ -59,13 +59,12 @@ object ReadSide {
         // having more tags will also increase write throughput/latency as it'll write to
         // many partitions
         // downside is running many streams/queries against c*
-        tags.foreach(
-          tag =>
-            new EventProcessorStream[ConfigurablePersistentActor.Event](
-              ctx.system,
-              ctx.executionContext,
-              s"processor-$nr",
-              tag).runQueryStream(killSwitch, histogram))
+        tags.foreach(tag =>
+          new EventProcessorStream[ConfigurablePersistentActor.Event](
+            ctx.system,
+            ctx.executionContext,
+            s"processor-$nr",
+            tag).runQueryStream(killSwitch, histogram))
 
         Behaviors
           .receiveMessage[Command] {
diff --git a/project/Common.scala b/project/Common.scala
index 4566f7d..83e9176 100644
--- a/project/Common.scala
+++ b/project/Common.scala
@@ -22,14 +22,14 @@ object Common extends AutoPlugin {
       homepage := Some(url("https://akka.io")),
       // apiURL defined in projectSettings because version.value is not correct here
       scmInfo := Some(
-          ScmInfo(
-            url("https://github.com/akka/akka-persistence-cassandra"),
-            "git@github.com:akka/akka-persistence-cassandra.git")),
+        ScmInfo(
+          url("https://github.com/akka/akka-persistence-cassandra"),
+          "git@github.com:akka/akka-persistence-cassandra.git")),
       developers += Developer(
-          "contributors",
-          "Contributors",
-          "https://gitter.im/akka/dev",
-          url("https://github.com/akka/akka-persistence-cassandra/graphs/contributors")),
+        "contributors",
+        "Contributors",
+        "https://gitter.im/akka/dev",
+        url("https://github.com/akka/akka-persistence-cassandra/graphs/contributors")),
       licenses := Seq(("Apache-2.0", url("https://www.apache.org/licenses/LICENSE-2.0"))),
       description := "A Cassandra plugin for Akka Persistence.")
 
@@ -41,27 +41,27 @@ object Common extends AutoPlugin {
     scalacOptions ++= Seq("-encoding", "UTF-8", "-feature", "-unchecked", "-Xlint", "-Ywarn-dead-code", "-deprecation"),
     Compile / console / scalacOptions --= Seq("-deprecation", "-Xfatal-warnings", "-Xlint", "-Ywarn-unused:imports"),
     Compile / doc / scalacOptions := scalacOptions.value ++ Seq(
-        "-doc-title",
-        "Akka Persistence Cassandra",
-        "-doc-version",
-        version.value,
-        "-sourcepath",
-        (ThisBuild / baseDirectory).value.toString,
-        "-doc-source-url", {
-          val branch = if (isSnapshot.value) "master" else s"v${version.value}"
-          s"https://github.com/akka/akka-persistence-cassandra/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
-        },
-        "-doc-canonical-base-url",
-        "https://doc.akka.io/api/akka-persistence-cassandra/current/",
-        "-skip-packages",
-        "akka.pattern" // for some reason Scaladoc creates this
-      ),
+      "-doc-title",
+      "Akka Persistence Cassandra",
+      "-doc-version",
+      version.value,
+      "-sourcepath",
+      (ThisBuild / baseDirectory).value.toString,
+      "-doc-source-url", {
+        val branch = if (isSnapshot.value) "master" else s"v${version.value}"
+        s"https://github.com/akka/akka-persistence-cassandra/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
+      },
+      "-doc-canonical-base-url",
+      "https://doc.akka.io/api/akka-persistence-cassandra/current/",
+      "-skip-packages",
+      "akka.pattern" // for some reason Scaladoc creates this
+    ),
     Compile / doc / scalacOptions --= Seq("-Xfatal-warnings"),
     scalafmtOnCompile := true,
     autoAPIMappings := true,
     apiURL := Some(url(s"https://doc.akka.io/api/akka-persistence-cassandra/${projectInfoVersion.value}")),
     headerLicense := Some(
-        HeaderLicense.Custom("""Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>""")),
+      HeaderLicense.Custom("""Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>""")),
     sonatypeProfileName := "com.typesafe",
     Test / logBuffered := System.getProperty("akka.logBufferedTests", "false").toBoolean,
     // show full stack traces and test case durations
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 6e26982..da1e3a4 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -36,15 +36,15 @@ object Dependencies {
     "com.typesafe.akka" %% "akka-cluster-sharding")
 
   val akkaPersistenceCassandraDependencies = Seq(
-      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % AlpakkaVersion,
-      "com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
-      "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
-      "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
-      "org.scala-lang.modules" %% "scala-collection-compat" % "2.4.4",
-      Logback % Test,
-      "org.scalatest" %% "scalatest" % "3.2.11" % Test,
-      "org.pegdown" % "pegdown" % "1.6.0" % Test,
-      "org.osgi" % "org.osgi.core" % "5.0.0" % Provided) ++ akkaTestDeps.map(_ % AkkaVersion % Test)
+    "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % AlpakkaVersion,
+    "com.typesafe.akka" %% "akka-persistence" % AkkaVersion,
+    "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
+    "com.typesafe.akka" %% "akka-cluster-tools" % AkkaVersion,
+    "org.scala-lang.modules" %% "scala-collection-compat" % "2.4.4",
+    Logback % Test,
+    "org.scalatest" %% "scalatest" % "3.2.11" % Test,
+    "org.pegdown" % "pegdown" % "1.6.0" % Test,
+    "org.osgi" % "org.osgi.core" % "5.0.0" % Provided) ++ akkaTestDeps.map(_ % AkkaVersion % Test)
 
   val exampleDependencies = Seq(
     Logback,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-persistence-cassandra] 06/07: Replace travis references with github actions (#5)

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit 7d943ea9be36c566f2c846661f9dedb2f8b52593
Author: Matthew de Detrich <ma...@aiven.io>
AuthorDate: Tue Nov 15 14:44:07 2022 +0100

    Replace travis references with github actions (#5)
---
 .jvmopts-travis                      | 4 ----
 CONTRIBUTING.md                      | 8 +-------
 README.md                            | 2 +-
 docs/release-train-issue-template.md | 4 ++--
 4 files changed, 4 insertions(+), 14 deletions(-)

diff --git a/.jvmopts-travis b/.jvmopts-travis
deleted file mode 100644
index c721e39..0000000
--- a/.jvmopts-travis
+++ /dev/null
@@ -1,4 +0,0 @@
-# This is used to configure the sbt instance that Travis launches
-
--Dfile.encoding=UTF8
--Dsbt.color=always
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 83feaae..681231a 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -14,7 +14,7 @@ You may also check out these [other resources](https://akka.io/get-involved/).
 
 * [Lightbend Contributor License Agreement](https://www.lightbend.com/contribute/cla)
 * [Issue Tracker](https://github.com/akka/akka-persistence-cassandra/issues)
-* [CI](https://travis-ci.org/akka/akka-persistence-cassandra)
+* [CI](https://github.com/apache/incubator-pekko-persistence-cassandra/actions)
 
 # Lightbend Project & Developer Guidelines
 
@@ -146,12 +146,6 @@ Example:
     * Details 2
     * Details 3
 
-## How To Enforce These Guidelines?
-
-### Make Use of Pull Request Validator
-akka-persistence-cassandra uses [Travis pull request builder](https://travis-ci.org/akka/akka-persistence-cassandra) 
-that automatically merges the code, builds it, runs the tests and comments on the Pull Request in GitHub.
-
 ## Source style
 
 akka-persistence-cassandra uses [Scalafmt](https://scalameta.org/scalafmt/) to enforce some of the code style rules.
diff --git a/README.md b/README.md
index 83562c2..e8e69c4 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ For questions please use the [discuss.akka.io](https://discuss.lightbend.com/c/a
 
 Implementation in the `master` branch is currently `1.0.x` release.
 
-[![Build Status](https://travis-ci.org/akka/akka-persistence-cassandra.svg?branch=master)](https://travis-ci.org/akka/akka-persistence-cassandra)
+[![Build Status](https://github.com/apache/incubator-pekko-persistence-cassandra/actions/workflows/check-build-test.yml/badge.svg?branch=main)](https://github.com/apache/incubator-pekko-persistence-cassandra/actions/workflows/check-build-test.yml?query=branch%3Amain)
 
 
 ## Documentation
diff --git a/docs/release-train-issue-template.md b/docs/release-train-issue-template.md
index e19d094..9ea60ae 100644
--- a/docs/release-train-issue-template.md
+++ b/docs/release-train-issue-template.md
@@ -26,9 +26,9 @@ Variables to be expanded in this template:
 
 ### Cutting the release
 
-- [ ] Make sure any running [Travis builds](https://travis-ci.org/akka/akka-persistence-cassandra) for the commit you would like to release have completed.
+- [ ] Make sure any running [Github Action builds](https://github.com/apache/incubator-pekko-persistence-cassandra/actions) for the commit you would like to release have completed.
 - [ ] Create the [release notes](https://github.com/akka/akka-persistence-cassandra/releases) with the next tag version `v$VERSION$`, title, release description and contributors (the latter generated by [`authors`](https://github.com/2m/authors) (eg. `authors v1.0.1 HEAD`))
-- [ ] Check that Travis CI release build has executed successfully (Travis will start a [CI build](https://travis-ci.org/akka/akka-persistence-cassandra/builds) for the new tag and publish artifacts to Bintray and documentation to Gustav)
+- [ ] Check that Github Actions CI release build has executed successfully (Github Actions will start a [CI build](https://github.com/apache/incubator-pekko-persistence-cassandra/actions) for the new tag and publish artifacts to Bintray and documentation to Gustav)
 - [ ] Go to [Bintray](https://bintray.com/akka/maven/akka-persistence-cassandra) and select the just released version
 - [ ] Log in, go to the Maven Central tab, check the *Close and release repository when done* checkbox and sync with Sonatype (using your Sonatype TOKEN key and password)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-persistence-cassandra] 07/07: unprotect main branch so merge commit can be removed (#6)

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit 716652908f6a0bbb3d2304088e7a99254f811195
Author: PJ Fanning <pj...@users.noreply.github.com>
AuthorDate: Tue Nov 15 20:35:10 2022 +0100

    unprotect main branch so merge commit can be removed (#6)
---
 .asf.yaml | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index 26e7e88..0119553 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -25,8 +25,7 @@ github:
     merge:   false
     rebase:  true
 
-  protected_branches:
-    main: { }
+  protected_branches: ~
 
 notifications:
   commits:              commits@pekko.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-persistence-cassandra] 04/07: try to fix CI build (#3)

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit 65de2a6b08ec601749a76c11284189ea03e88572
Author: PJ Fanning <pj...@users.noreply.github.com>
AuthorDate: Tue Nov 15 12:22:14 2022 +0100

    try to fix CI build (#3)
    
    * try to fix CI build
    
    * Update publish.yml
    
    * Update release-drafter.yml
    
    * Update publish.yml
    
    * disable fetch tags
---
 .github/workflows/check-build-test.yml | 16 +++++++++-------
 .github/workflows/publish.yml          |  2 +-
 .github/workflows/release-drafter.yml  |  1 +
 3 files changed, 11 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/check-build-test.yml b/.github/workflows/check-build-test.yml
index fc15d3e..6e39fa8 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -12,7 +12,7 @@ on:
 jobs:
   style-compile:
     name: Compile, Code Style
-    if: github.repository == 'akka/akka-persistence-cassandra'
+    if: github.repository == 'apache/incubator-pekko-persistence-cassandra'
     runs-on: ubuntu-20.04
     env:
       JAVA_OPTS: -Xms2G -Xmx2G -Xss2M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
@@ -23,8 +23,9 @@ jobs:
         with: # https://github.com/olafurpg/setup-scala#faster-checkout-of-big-repos
           fetch-depth: 0
 
-      - name: Fetch tags
-        run: git fetch --depth=100 origin +refs/tags/*:refs/tags/*
+      # temporarily disable git fetch of tags (no tags currently) 
+      #- name: Fetch tags
+      #  run: git fetch --depth=100 origin +refs/tags/*:refs/tags/*
 
       - name: Set up JDK 11
         uses: coursier/setup-action@v1.1.2
@@ -40,7 +41,7 @@ jobs:
 
   documentation:
     name: ScalaDoc, Documentation with Paradox
-    if: github.repository == 'akka/akka-persistence-cassandra'
+    if: github.repository == 'apache/incubator-pekko-persistence-cassandra'
     runs-on: ubuntu-20.04
     env:
       JAVA_OPTS: -Xms2G -Xmx2G -Xss2M -XX:ReservedCodeCacheSize=256M -Dfile.encoding=UTF-8
@@ -51,8 +52,9 @@ jobs:
         with: # https://github.com/olafurpg/setup-scala#faster-checkout-of-big-repos
           fetch-depth: 100
 
-      - name: Fetch tags
-        run: git fetch --depth=100 origin +refs/tags/*:refs/tags/*
+      # temporarily disable git fetch of tags (no tags currently) 
+      #- name: Fetch tags
+      #  run: git fetch --depth=100 origin +refs/tags/*:refs/tags/*
 
       - name: Set up JDK 11
         uses: coursier/setup-action@v1.1.2
@@ -70,7 +72,7 @@ jobs:
 
   test:
     name: Test
-    if: github.repository == 'akka/akka-persistence-cassandra'
+    if: github.repository == 'apache/incubator-pekko-persistence-cassandra'
     runs-on: ubuntu-20.04
 
     strategy:
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 7933045..868674d 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -13,7 +13,7 @@ on:
 jobs:
   release:
     # runs on main repo only
-    if: github.repository == 'akka/akka-persistence-cassandra'
+    if: github.repository == 'apache/incubator-pekko-persistence-cassandra'
     name: Release
     environment: release
     runs-on: ubuntu-20.04
diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml
index ffb5088..934c107 100644
--- a/.github/workflows/release-drafter.yml
+++ b/.github/workflows/release-drafter.yml
@@ -5,6 +5,7 @@ on:
     # branches to consider in the event; optional, defaults to all
     branches:
       - master
+      - main
 
 jobs:
   update_release_draft:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-persistence-cassandra] 05/07: Remove circleci config (#4)

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit 81308c4820964725814890559e15de47c8d33317
Author: Matthew de Detrich <ma...@aiven.io>
AuthorDate: Tue Nov 15 14:25:41 2022 +0100

    Remove circleci config (#4)
---
 .circleci/config.yml | 45 ---------------------------------------------
 1 file changed, 45 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
deleted file mode 100644
index e0b3337..0000000
--- a/.circleci/config.yml
+++ /dev/null
@@ -1,45 +0,0 @@
-version: 2
-jobs:
-  build:
-    working_directory: ~/repo
-
-    environment:
-      TERM: dumb
-      CASSANDRA_MODE: external
-      AKKA_TEST_TIMEFACTOR: 10
-
-    docker:
-      - image: bartektomala/scala-sbt:8u181_2.12.6_0.13.17
-
-      - image: scylladb/scylla:2.2.0
-        command: --listen-address 0.0.0.0 --broadcast-rpc-address 0.0.0.0 --experimental 1
-
-    steps:
-      - checkout
-
-      - restore_cache:
-          name: Restoring sbt cache
-          keys:
-            - sbt
-
-      - run:
-          name: Update sbt cache
-          command: sbt update
-
-      - save_cache:
-          name: Save updated sbt cache
-          key: sbt
-          paths:
-            - "/root/.sbt"
-            - "/root/.ivy2"
-
-      - run:
-          name: tests
-          command: |
-            sbt ';set testOptions in Global += Tests.Argument(TestFrameworks.ScalaTest, "-u", "./target/xml-reports/scylladb");test'
-
-      - store_test_results:
-          path: ./target/xml-reports
-
-      - store_artifacts:
-          path: ./target/test-reports
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org


[incubator-pekko-persistence-cassandra] 02/07: Add .gitattributes to enforce unix line endings

Posted by fa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git

commit 955b84d2f569022ce5d24798396c2170e472d258
Author: Matthew de Detrich <ma...@aiven.io>
AuthorDate: Sat Nov 12 07:40:37 2022 +0100

    Add .gitattributes to enforce unix line endings
---
 .gitattributes | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..9dde9b9
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,5 @@
+# Activate line ending normalization, setting eol will make the behavior match core.autocrlf = input
+* text=auto eol=lf
+# Force batch scripts to always use CRLF line endings
+*.{cmd,[cC][mM][dD]} text eol=crlf
+*.{bat,[bB][aA][tT]} text eol=crlf


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org