You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:05 UTC

[incubator-pekko-samples] 01/01: Use persistence-multi-dc-testkit

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch akka-sample-persistence-multi-dc-scala
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git

commit 24003fafca2e062b2adfb23aee6316476bf42d0e
Author: Arnout Engelen <ar...@bzzt.net>
AuthorDate: Wed Oct 25 17:08:15 2017 +0200

    Use persistence-multi-dc-testkit
---
 akka-sample-persistence-multi-dc-scala/build.sbt   |  10 +-
 .../src/test/resources/logback-test.xml            |  18 ---
 .../persistence/multidc/testkit/BaseSpec.scala     | 131 ---------------------
 .../multidc/testkit/CassandraLifecycle.scala       | 107 -----------------
 ...InterruptableCassandraReadJournalProvider.scala | 108 -----------------
 5 files changed, 4 insertions(+), 370 deletions(-)

diff --git a/akka-sample-persistence-multi-dc-scala/build.sbt b/akka-sample-persistence-multi-dc-scala/build.sbt
index 8a1486b..27fa2b0 100644
--- a/akka-sample-persistence-multi-dc-scala/build.sbt
+++ b/akka-sample-persistence-multi-dc-scala/build.sbt
@@ -8,13 +8,11 @@ resolvers += "com-mvn" at "https://repo.lightbend.com/commercial-releases/"
 resolvers += Resolver.url("com-ivy",
   url("https://repo.lightbend.com/commercial-releases/"))(Resolver.ivyStylePatterns)
 
+val persistenceMultiDcVersion = "1.1-M4+3-667a6ef6"
+
 libraryDependencies ++= Seq(
-  "com.lightbend.akka" %% "akka-persistence-multi-dc" % "1.1-M4",
-  // TODO replace with akka-persistence-multi-dc-testkit and move test infra there
-  "com.typesafe.akka" %% "akka-testkit" % "2.5.6" % "test",
-  // TODO make dependency of akka-persistence-multi-dc-testkit
-  "com.typesafe.akka" %% "akka-stream-contrib" % "0.8" % "test",
-  "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.58" % "test",
+  "com.lightbend.akka" %% "akka-persistence-multi-dc" % persistenceMultiDcVersion,
+  "com.lightbend.akka" %% "akka-persistence-multi-dc-testkit" % persistenceMultiDcVersion % "test",
   "org.scalatest" %% "scalatest" % "3.0.1" % "test"
 )
 
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml b/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml
deleted file mode 100644
index e75548e..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-
-    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-        <target>System.out</target>
-        <encoder>
-            <pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %m%n%xException</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="org.apache.cassandra" level="ERROR" />
-    <logger name="com.datastax.driver.core" level="WARN" />
-
-    <root level="INFO">
-        <appender-ref ref="CONSOLE"/>
-    </root>
-
-</configuration>
\ No newline at end of file
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala
deleted file mode 100644
index 2f99118..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-package akka.persistence.multidc.testkit
-
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-
-import akka.actor.ActorRef
-import akka.actor.ActorSystem
-import akka.cluster.Cluster
-import akka.persistence.multidc.PersistenceMultiDcSettings
-import akka.persistence.query.PersistenceQuery
-import akka.testkit.ImplicitSender
-import akka.testkit.TestKit
-import akka.testkit.TestProbe
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.Matchers
-import org.scalatest.WordSpecLike
-import org.scalatest.BeforeAndAfter
-
-import akka.persistence.multidc.testkit._
-import akka.persistence.multidc.internal.CassandraReplicatedEventQuery
-import akka.persistence.multidc.internal.ReplicatedEventEnvelope
-
-object BaseSpec {
-  val clusterConfig = ConfigFactory.parseString("""
-    akka.actor.provider = "cluster"
-    akka.remote.netty.tcp.port = 0
-    akka.remote.artery.canonical.port = 0
-    akka.remote.artery.canonical.hostname = 127.0.0.1
-    akka.cluster.jmx.multi-mbeans-in-same-jvm = on
-
-    # inceasing probability due to issue https://github.com/akka/akka/issues/23803
-    akka.cluster.multi-data-center.cross-data-center-gossip-probability = 0.5
-
-    # speed up joining and such
-    akka.cluster.gossip-interval = 500 ms
-  """)
-
-  def createFirstSystem(name: String): ActorSystem = createFirstSystem(name, ConfigFactory.empty())
-  def createFirstSystem(name: String, cfg: Config): ActorSystem = ActorSystem(
-    name,
-    cfg.withFallback(ConfigFactory.parseString(s"""
-    akka.loglevel = INFO
-    akka.cluster.multi-data-center.self-data-center = DC-A
-    akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
-    cassandra-journal-multi-dc.keyspace=$name
-    cassandra-snapshot-store.keyspace=${name}Snapshot
-    cassandra-query-journal-multi-dc.class = "${classOf[InterruptableCassandraReadJournalProvider].getName}"
-    """)).withFallback(BaseSpec.clusterConfig).withFallback(CassandraLifecycle.config))
-
-  def otherSystemSettings: Config =
-    ConfigFactory.parseString("""
-        akka.cluster.multi-data-center.self-data-center = DC-B
-        akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
-        """)
-
-  def createOtherSystem(firstSystem: ActorSystem) =
-    ActorSystem(
-      firstSystem.name,
-      ConfigFactory.parseString("""
-        akka.cluster.multi-data-center.self-data-center = DC-B
-        akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
-        """).withFallback(firstSystem.settings.config))
-
-  def createThirdSystem(firstSystem: ActorSystem) =
-    ActorSystem(
-      firstSystem.name,
-      ConfigFactory.parseString("""
-        akka.cluster.multi-data-center.self-data-center = DC-C
-        akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
-        """).withFallback(firstSystem.settings.config))
-
-}
-
-// TODO move a lot of this to a trait in testkit?
-abstract class BaseSpec(name: String, cfg: Config = ConfigFactory.empty)
-  extends TestKit(BaseSpec.createFirstSystem(name, cfg))
-  with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfter with CassandraLifecycleScalatest {
-  import BaseSpec._
-
-  val otherSystem = createOtherSystem(system)
-  val thirdSystem = createThirdSystem(system)
-
-  val settings = PersistenceMultiDcSettings(system)
-  val otherSettings = PersistenceMultiDcSettings(otherSystem)
-  val thirdSettings = PersistenceMultiDcSettings(thirdSystem)
-
-  private def queries(sys: ActorSystem): InterruptableCassandraReplicatedEventQuery =
-    PersistenceQuery(sys).readJournalFor[InterruptableCassandraReplicatedEventQuery](CassandraReplicatedEventQuery.Identifier)
-
-  def disableReplication(from: ActorSystem, to: ActorSystem) =
-    queries(to).disableReplication(Cluster(from).selfDataCenter)
-
-  def enableReplication(from: ActorSystem, to: ActorSystem) =
-    queries(to).enableReplication(Cluster(from).selfDataCenter)
-
-  private[multidc] def addErrorFilter(sys: ActorSystem, key: String)(f: ReplicatedEventEnvelope => Option[Throwable]): Unit =
-    queries(sys).addErrorFilter(key)(f)
-
-  def removeErrorFilter(sys: ActorSystem, key: String): Unit =
-    queries(sys).removeErrorFilter(key)
-
-  override def systemName: String = name
-
-  after {
-    Seq(system, otherSystem, thirdSystem).foreach { sys =>
-      queries(sys).enableAll()
-    }
-  }
-
-  override protected def afterAll(): Unit = {
-    shutdown(otherSystem)
-    shutdown(thirdSystem)
-    shutdown()
-    super.afterAll()
-  }
-
-  def stopA(ref: ActorRef): Unit =
-    stop(ref, system)
-
-  def stopB(ref: ActorRef): Unit =
-    stop(ref, otherSystem)
-
-  def stop(ref: ActorRef, sys: ActorSystem): Unit = {
-    val probe = TestProbe()(sys)
-    probe.watch(ref)
-    sys.stop(ref)
-    probe.expectTerminated(ref)
-  }
-
-}
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala
deleted file mode 100644
index caac3a4..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
- */
-package akka.persistence.multidc.testkit
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration._
-import akka.actor.ActorSystem
-import akka.actor.Props
-import akka.persistence.PersistentActor
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.testkit.TestKitBase
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.Suite
-
-object CassandraLifecycle {
-
-  val config = ConfigFactory.parseString(s"""
-    akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
-    cassandra-journal-multi-dc.port = ${CassandraLauncher.randomPort}
-    cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
-    cassandra-journal-multi-dc.circuit-breaker.call-timeout = 30s
-    akka.test.single-expect-default = 10s
-    """)
-
-  def awaitPersistenceInit(system: ActorSystem, journalPluginId: String = "", snapshotPluginId: String = ""): Unit = {
-    val probe = TestProbe()(system)
-    val t0 = System.nanoTime()
-    var n = 0
-    probe.within(45.seconds) {
-      probe.awaitAssert {
-        n += 1
-        system.actorOf(Props(classOf[AwaitPersistenceInit], journalPluginId, snapshotPluginId), "persistenceInit" + n).tell("hello", probe.ref)
-        probe.expectMsg(5.seconds, "hello")
-        system.log.debug("awaitPersistenceInit took {} ms {}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0), system.name)
-      }
-    }
-  }
-
-  class AwaitPersistenceInit(
-    override val journalPluginId:  String,
-    override val snapshotPluginId: String) extends PersistentActor {
-    def persistenceId: String = "persistenceInit"
-
-    def receiveRecover: Receive = {
-      case _ =>
-    }
-
-    def receiveCommand: Receive = {
-      case msg =>
-        persist(msg) { _ =>
-          sender() ! msg
-          context.stop(self)
-        }
-    }
-  }
-
-  def startCassandra(systemName: String, cassandraConfigResource: String): Unit =
-    startCassandra(None, None, systemName, cassandraConfigResource)
-
-  def startCassandra(host: Option[String], port: Option[Int],
-                     systemName:              String,
-                     cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource): Unit = {
-    val cassandraDirectory = new File("target/" + systemName)
-    CassandraLauncher.start(
-      cassandraDirectory,
-      configResource = cassandraConfigResource,
-      clean = true,
-      port = port.getOrElse(0),
-      CassandraLauncher.classpathForResources("logback-test.xml"),
-      host)
-  }
-
-  def stopCassandra(): Unit = {
-    CassandraLauncher.stop()
-  }
-
-  def awaitPersistenceInit(system: ActorSystem): Unit = {
-    CassandraLifecycle.awaitPersistenceInit(system, journalPluginId = "cassandra-journal-multi-dc")
-  }
-
-}
-
-trait CassandraLifecycleScalatest extends BeforeAndAfterAll { this: TestKitBase with Suite =>
-
-  import CassandraLifecycle._
-
-  def systemName: String
-
-  def cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource
-
-  override protected def beforeAll(): Unit = {
-    startCassandra(None, None, systemName, cassandraConfigResource)
-    awaitPersistenceInit(system)
-    super.beforeAll()
-  }
-
-  override protected def afterAll(): Unit = {
-    shutdown(system, verifySystemShutdown = true)
-    stopCassandra()
-    super.afterAll()
-  }
-}
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala
deleted file mode 100644
index 0e6f029..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
- */
-package akka.persistence.multidc.testkit
-
-import scala.annotation.tailrec
-import scala.concurrent.Future
-
-import com.typesafe.config.Config
-
-import akka.NotUsed
-import akka.actor.ExtendedActorSystem
-import akka.actor.ExtendedActorSystem
-import akka.annotation.InternalApi
-import akka.event.Logging
-import akka.persistence.query.ReadJournalProvider
-import akka.persistence.cassandra.query.EventsByPersistenceIdStage
-import akka.persistence.multidc.internal.CassandraReplicatedEventQuery
-import akka.persistence.multidc.internal.ReplicatedEventEnvelope
-import akka.stream.contrib.{ SwitchMode, Valve, ValveSwitch }
-import akka.stream.scaladsl.{ Keep, Source }
-
-@InternalApi private[testkit] class InterruptableCassandraReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {
-
-  override val scaladslReadJournal: CassandraReplicatedEventQuery =
-    new InterruptableCassandraReplicatedEventQuery(system, config)
-
-  override val javadslReadJournal: CassandraReplicatedEventQuery =
-    scaladslReadJournal
-
-}
-
-/**
- * It is retrieved with:
- * {{{
- * val queries = PersistenceQuery(system).readJournalFor[InterruptableCassandraReplicatedEventQuery](CassandraReplicatedEventQeury.Identifier)
- * }}}
- */
-@InternalApi private[akka] class InterruptableCassandraReplicatedEventQuery(system: ExtendedActorSystem, config: Config)
-  extends CassandraReplicatedEventQuery(system, config) {
-
-  private val log = Logging(system, getClass)
-  private var sourcesByDc = Map.empty[String, List[ValveSwitch]]
-  private var pendingDisable = Set.empty[String]
-  private var errors = Map.empty[String, ReplicatedEventEnvelope => Option[Throwable]]
-
-  override def replicatedEvents(persistenceId: String, fromDc: String, sequenceNr: Long): Source[ReplicatedEventEnvelope, Future[EventsByPersistenceIdStage.Control]] = {
-    super.replicatedEvents(persistenceId, fromDc, sequenceNr)
-      .viaMat(new Valve(SwitchMode.Open))(Keep.both)
-      .map { e =>
-        errors.foreach { case (_, f) => f(e).foreach(throw _) }
-        e
-      }
-      .mapMaterializedValue {
-        case (control, fs) =>
-          add(fromDc, fs)
-          control
-      }
-  }
-
-  def disableReplication(dc: String) = synchronized {
-    sourcesByDc.get(dc) match {
-      case Some(values) => values.foreach(_.flip(SwitchMode.Close))
-      case None         => pendingDisable += dc // not started yet
-    }
-  }
-
-  def enableReplication(dc: String) = synchronized {
-    sourcesByDc.get(dc) match {
-      case Some(values) => values.foreach(_.flip(SwitchMode.Open))
-      case None         => pendingDisable -= dc
-    }
-
-  }
-
-  def enableAll(): Unit = synchronized {
-    sourcesByDc.keys.foreach(enableReplication)
-    pendingDisable = Set.empty
-  }
-
-  private def add(dc: String, fs: Future[ValveSwitch]): Unit = {
-    implicit val ec = system.dispatcher
-    fs.onSuccess { case switch => add(dc, switch) }
-  }
-
-  private def add(dc: String, switch: ValveSwitch): Unit = synchronized {
-    val oldList = sourcesByDc.getOrElse(dc, Nil)
-    sourcesByDc = sourcesByDc.updated(dc, switch :: oldList)
-    if (pendingDisable(dc)) {
-      switch.flip(SwitchMode.Close)
-      pendingDisable -= dc
-    }
-  }
-
-  def addErrorFilter(key: String)(f: ReplicatedEventEnvelope => Option[Throwable]): Unit = synchronized {
-    errors = errors.updated(key, f)
-  }
-
-  def removeErrorFilter(key: String): Unit = synchronized {
-    errors = errors - key
-  }
-
-  def removeAllErrorFilters(): Unit = synchronized {
-    errors = Map.empty
-  }
-
-}
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org