You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2022/10/31 09:35:05 UTC
[incubator-pekko-samples] 01/01: Use persistence-multi-dc-testkit
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch akka-sample-persistence-multi-dc-scala
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 24003fafca2e062b2adfb23aee6316476bf42d0e
Author: Arnout Engelen <ar...@bzzt.net>
AuthorDate: Wed Oct 25 17:08:15 2017 +0200
Use persistence-multi-dc-testkit
---
akka-sample-persistence-multi-dc-scala/build.sbt | 10 +-
.../src/test/resources/logback-test.xml | 18 ---
.../persistence/multidc/testkit/BaseSpec.scala | 131 ---------------------
.../multidc/testkit/CassandraLifecycle.scala | 107 -----------------
...InterruptableCassandraReadJournalProvider.scala | 108 -----------------
5 files changed, 4 insertions(+), 370 deletions(-)
diff --git a/akka-sample-persistence-multi-dc-scala/build.sbt b/akka-sample-persistence-multi-dc-scala/build.sbt
index 8a1486b..27fa2b0 100644
--- a/akka-sample-persistence-multi-dc-scala/build.sbt
+++ b/akka-sample-persistence-multi-dc-scala/build.sbt
@@ -8,13 +8,11 @@ resolvers += "com-mvn" at "https://repo.lightbend.com/commercial-releases/"
resolvers += Resolver.url("com-ivy",
url("https://repo.lightbend.com/commercial-releases/"))(Resolver.ivyStylePatterns)
+val persistenceMultiDcVersion = "1.1-M4+3-667a6ef6"
+
libraryDependencies ++= Seq(
- "com.lightbend.akka" %% "akka-persistence-multi-dc" % "1.1-M4",
- // TODO replace with akka-persistence-multi-dc-testkit and move test infra there
- "com.typesafe.akka" %% "akka-testkit" % "2.5.6" % "test",
- // TODO make dependency of akka-persistence-multi-dc-testkit
- "com.typesafe.akka" %% "akka-stream-contrib" % "0.8" % "test",
- "com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.58" % "test",
+ "com.lightbend.akka" %% "akka-persistence-multi-dc" % persistenceMultiDcVersion,
+ "com.lightbend.akka" %% "akka-persistence-multi-dc-testkit" % persistenceMultiDcVersion % "test",
"org.scalatest" %% "scalatest" % "3.0.1" % "test"
)
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml b/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml
deleted file mode 100644
index e75548e..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <target>System.out</target>
- <encoder>
- <pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - %m%n%xException</pattern>
- </encoder>
- </appender>
-
- <logger name="org.apache.cassandra" level="ERROR" />
- <logger name="com.datastax.driver.core" level="WARN" />
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- </root>
-
-</configuration>
\ No newline at end of file
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala
deleted file mode 100644
index 2f99118..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/BaseSpec.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-package akka.persistence.multidc.testkit
-
-import com.typesafe.config.Config
-import com.typesafe.config.ConfigFactory
-
-import akka.actor.ActorRef
-import akka.actor.ActorSystem
-import akka.cluster.Cluster
-import akka.persistence.multidc.PersistenceMultiDcSettings
-import akka.persistence.query.PersistenceQuery
-import akka.testkit.ImplicitSender
-import akka.testkit.TestKit
-import akka.testkit.TestProbe
-
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.Matchers
-import org.scalatest.WordSpecLike
-import org.scalatest.BeforeAndAfter
-
-import akka.persistence.multidc.testkit._
-import akka.persistence.multidc.internal.CassandraReplicatedEventQuery
-import akka.persistence.multidc.internal.ReplicatedEventEnvelope
-
-object BaseSpec {
- val clusterConfig = ConfigFactory.parseString("""
- akka.actor.provider = "cluster"
- akka.remote.netty.tcp.port = 0
- akka.remote.artery.canonical.port = 0
- akka.remote.artery.canonical.hostname = 127.0.0.1
- akka.cluster.jmx.multi-mbeans-in-same-jvm = on
-
- # inceasing probability due to issue https://github.com/akka/akka/issues/23803
- akka.cluster.multi-data-center.cross-data-center-gossip-probability = 0.5
-
- # speed up joining and such
- akka.cluster.gossip-interval = 500 ms
- """)
-
- def createFirstSystem(name: String): ActorSystem = createFirstSystem(name, ConfigFactory.empty())
- def createFirstSystem(name: String, cfg: Config): ActorSystem = ActorSystem(
- name,
- cfg.withFallback(ConfigFactory.parseString(s"""
- akka.loglevel = INFO
- akka.cluster.multi-data-center.self-data-center = DC-A
- akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
- cassandra-journal-multi-dc.keyspace=$name
- cassandra-snapshot-store.keyspace=${name}Snapshot
- cassandra-query-journal-multi-dc.class = "${classOf[InterruptableCassandraReadJournalProvider].getName}"
- """)).withFallback(BaseSpec.clusterConfig).withFallback(CassandraLifecycle.config))
-
- def otherSystemSettings: Config =
- ConfigFactory.parseString("""
- akka.cluster.multi-data-center.self-data-center = DC-B
- akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
- """)
-
- def createOtherSystem(firstSystem: ActorSystem) =
- ActorSystem(
- firstSystem.name,
- ConfigFactory.parseString("""
- akka.cluster.multi-data-center.self-data-center = DC-B
- akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
- """).withFallback(firstSystem.settings.config))
-
- def createThirdSystem(firstSystem: ActorSystem) =
- ActorSystem(
- firstSystem.name,
- ConfigFactory.parseString("""
- akka.cluster.multi-data-center.self-data-center = DC-C
- akka.persistence.multi-data-center.all-data-centers = [ DC-A, DC-B, DC-C ]
- """).withFallback(firstSystem.settings.config))
-
-}
-
-// TODO move a lot of this to a trait in testkit?
-abstract class BaseSpec(name: String, cfg: Config = ConfigFactory.empty)
- extends TestKit(BaseSpec.createFirstSystem(name, cfg))
- with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfter with CassandraLifecycleScalatest {
- import BaseSpec._
-
- val otherSystem = createOtherSystem(system)
- val thirdSystem = createThirdSystem(system)
-
- val settings = PersistenceMultiDcSettings(system)
- val otherSettings = PersistenceMultiDcSettings(otherSystem)
- val thirdSettings = PersistenceMultiDcSettings(thirdSystem)
-
- private def queries(sys: ActorSystem): InterruptableCassandraReplicatedEventQuery =
- PersistenceQuery(sys).readJournalFor[InterruptableCassandraReplicatedEventQuery](CassandraReplicatedEventQuery.Identifier)
-
- def disableReplication(from: ActorSystem, to: ActorSystem) =
- queries(to).disableReplication(Cluster(from).selfDataCenter)
-
- def enableReplication(from: ActorSystem, to: ActorSystem) =
- queries(to).enableReplication(Cluster(from).selfDataCenter)
-
- private[multidc] def addErrorFilter(sys: ActorSystem, key: String)(f: ReplicatedEventEnvelope => Option[Throwable]): Unit =
- queries(sys).addErrorFilter(key)(f)
-
- def removeErrorFilter(sys: ActorSystem, key: String): Unit =
- queries(sys).removeErrorFilter(key)
-
- override def systemName: String = name
-
- after {
- Seq(system, otherSystem, thirdSystem).foreach { sys =>
- queries(sys).enableAll()
- }
- }
-
- override protected def afterAll(): Unit = {
- shutdown(otherSystem)
- shutdown(thirdSystem)
- shutdown()
- super.afterAll()
- }
-
- def stopA(ref: ActorRef): Unit =
- stop(ref, system)
-
- def stopB(ref: ActorRef): Unit =
- stop(ref, otherSystem)
-
- def stop(ref: ActorRef, sys: ActorSystem): Unit = {
- val probe = TestProbe()(sys)
- probe.watch(ref)
- sys.stop(ref)
- probe.expectTerminated(ref)
- }
-
-}
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala
deleted file mode 100644
index caac3a4..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/CassandraLifecycle.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
- */
-package akka.persistence.multidc.testkit
-
-import java.io.File
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration._
-import akka.actor.ActorSystem
-import akka.actor.Props
-import akka.persistence.PersistentActor
-import akka.persistence.cassandra.testkit.CassandraLauncher
-import akka.testkit.TestKitBase
-import akka.testkit.TestProbe
-import com.typesafe.config.ConfigFactory
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.Suite
-
-object CassandraLifecycle {
-
- val config = ConfigFactory.parseString(s"""
- akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
- cassandra-journal-multi-dc.port = ${CassandraLauncher.randomPort}
- cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
- cassandra-journal-multi-dc.circuit-breaker.call-timeout = 30s
- akka.test.single-expect-default = 10s
- """)
-
- def awaitPersistenceInit(system: ActorSystem, journalPluginId: String = "", snapshotPluginId: String = ""): Unit = {
- val probe = TestProbe()(system)
- val t0 = System.nanoTime()
- var n = 0
- probe.within(45.seconds) {
- probe.awaitAssert {
- n += 1
- system.actorOf(Props(classOf[AwaitPersistenceInit], journalPluginId, snapshotPluginId), "persistenceInit" + n).tell("hello", probe.ref)
- probe.expectMsg(5.seconds, "hello")
- system.log.debug("awaitPersistenceInit took {} ms {}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0), system.name)
- }
- }
- }
-
- class AwaitPersistenceInit(
- override val journalPluginId: String,
- override val snapshotPluginId: String) extends PersistentActor {
- def persistenceId: String = "persistenceInit"
-
- def receiveRecover: Receive = {
- case _ =>
- }
-
- def receiveCommand: Receive = {
- case msg =>
- persist(msg) { _ =>
- sender() ! msg
- context.stop(self)
- }
- }
- }
-
- def startCassandra(systemName: String, cassandraConfigResource: String): Unit =
- startCassandra(None, None, systemName, cassandraConfigResource)
-
- def startCassandra(host: Option[String], port: Option[Int],
- systemName: String,
- cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource): Unit = {
- val cassandraDirectory = new File("target/" + systemName)
- CassandraLauncher.start(
- cassandraDirectory,
- configResource = cassandraConfigResource,
- clean = true,
- port = port.getOrElse(0),
- CassandraLauncher.classpathForResources("logback-test.xml"),
- host)
- }
-
- def stopCassandra(): Unit = {
- CassandraLauncher.stop()
- }
-
- def awaitPersistenceInit(system: ActorSystem): Unit = {
- CassandraLifecycle.awaitPersistenceInit(system, journalPluginId = "cassandra-journal-multi-dc")
- }
-
-}
-
-trait CassandraLifecycleScalatest extends BeforeAndAfterAll { this: TestKitBase with Suite =>
-
- import CassandraLifecycle._
-
- def systemName: String
-
- def cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource
-
- override protected def beforeAll(): Unit = {
- startCassandra(None, None, systemName, cassandraConfigResource)
- awaitPersistenceInit(system)
- super.beforeAll()
- }
-
- override protected def afterAll(): Unit = {
- shutdown(system, verifySystemShutdown = true)
- stopCassandra()
- super.afterAll()
- }
-}
diff --git a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala b/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala
deleted file mode 100644
index 0e6f029..0000000
--- a/akka-sample-persistence-multi-dc-scala/src/test/scala/akka/persistence/multidc/testkit/InterruptableCassandraReadJournalProvider.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
- */
-package akka.persistence.multidc.testkit
-
-import scala.annotation.tailrec
-import scala.concurrent.Future
-
-import com.typesafe.config.Config
-
-import akka.NotUsed
-import akka.actor.ExtendedActorSystem
-import akka.actor.ExtendedActorSystem
-import akka.annotation.InternalApi
-import akka.event.Logging
-import akka.persistence.query.ReadJournalProvider
-import akka.persistence.cassandra.query.EventsByPersistenceIdStage
-import akka.persistence.multidc.internal.CassandraReplicatedEventQuery
-import akka.persistence.multidc.internal.ReplicatedEventEnvelope
-import akka.stream.contrib.{ SwitchMode, Valve, ValveSwitch }
-import akka.stream.scaladsl.{ Keep, Source }
-
-@InternalApi private[testkit] class InterruptableCassandraReadJournalProvider(system: ExtendedActorSystem, config: Config) extends ReadJournalProvider {
-
- override val scaladslReadJournal: CassandraReplicatedEventQuery =
- new InterruptableCassandraReplicatedEventQuery(system, config)
-
- override val javadslReadJournal: CassandraReplicatedEventQuery =
- scaladslReadJournal
-
-}
-
-/**
- * It is retrieved with:
- * {{{
- * val queries = PersistenceQuery(system).readJournalFor[InterruptableCassandraReplicatedEventQuery](CassandraReplicatedEventQeury.Identifier)
- * }}}
- */
-@InternalApi private[akka] class InterruptableCassandraReplicatedEventQuery(system: ExtendedActorSystem, config: Config)
- extends CassandraReplicatedEventQuery(system, config) {
-
- private val log = Logging(system, getClass)
- private var sourcesByDc = Map.empty[String, List[ValveSwitch]]
- private var pendingDisable = Set.empty[String]
- private var errors = Map.empty[String, ReplicatedEventEnvelope => Option[Throwable]]
-
- override def replicatedEvents(persistenceId: String, fromDc: String, sequenceNr: Long): Source[ReplicatedEventEnvelope, Future[EventsByPersistenceIdStage.Control]] = {
- super.replicatedEvents(persistenceId, fromDc, sequenceNr)
- .viaMat(new Valve(SwitchMode.Open))(Keep.both)
- .map { e =>
- errors.foreach { case (_, f) => f(e).foreach(throw _) }
- e
- }
- .mapMaterializedValue {
- case (control, fs) =>
- add(fromDc, fs)
- control
- }
- }
-
- def disableReplication(dc: String) = synchronized {
- sourcesByDc.get(dc) match {
- case Some(values) => values.foreach(_.flip(SwitchMode.Close))
- case None => pendingDisable += dc // not started yet
- }
- }
-
- def enableReplication(dc: String) = synchronized {
- sourcesByDc.get(dc) match {
- case Some(values) => values.foreach(_.flip(SwitchMode.Open))
- case None => pendingDisable -= dc
- }
-
- }
-
- def enableAll(): Unit = synchronized {
- sourcesByDc.keys.foreach(enableReplication)
- pendingDisable = Set.empty
- }
-
- private def add(dc: String, fs: Future[ValveSwitch]): Unit = {
- implicit val ec = system.dispatcher
- fs.onSuccess { case switch => add(dc, switch) }
- }
-
- private def add(dc: String, switch: ValveSwitch): Unit = synchronized {
- val oldList = sourcesByDc.getOrElse(dc, Nil)
- sourcesByDc = sourcesByDc.updated(dc, switch :: oldList)
- if (pendingDisable(dc)) {
- switch.flip(SwitchMode.Close)
- pendingDisable -= dc
- }
- }
-
- def addErrorFilter(key: String)(f: ReplicatedEventEnvelope => Option[Throwable]): Unit = synchronized {
- errors = errors.updated(key, f)
- }
-
- def removeErrorFilter(key: String): Unit = synchronized {
- errors = errors - key
- }
-
- def removeAllErrorFilters(): Unit = synchronized {
- errors = Map.empty
- }
-
-}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org