You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by nv...@apache.org on 2023/03/21 20:52:03 UTC
[incubator-pekko-persistence-cassandra] branch main updated: Fixes compile warnings (#26)
This is an automated email from the ASF dual-hosted git repository.
nvollmar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-persistence-cassandra.git
The following commit(s) were added to refs/heads/main by this push:
new 6b87ee1 Fixes compile warnings (#26)
6b87ee1 is described below
commit 6b87ee1b8703f03d1a4761aa50fe29898f786ab0
Author: Nicolas Vollmar <nv...@gmail.com>
AuthorDate: Tue Mar 21 21:51:58 2023 +0100
Fixes compile warnings (#26)
* Fixes compile warnings
* Adds missing haders
---
.../cassandra/testkit/CassandraLauncher.scala | 5 +-
.../pekko/persistence/cassandra/Extractors.scala | 6 +-
.../persistence/cassandra/journal/TagWriter.scala | 6 +-
.../query/EventsByPersistenceIdStage.scala | 27 +++---
.../cassandra/query/EventsByTagStage.scala | 48 ++++------
.../query/scaladsl/CassandraReadJournal.scala | 60 ++++++------
.../snapshot/CassandraSnapshotStore.scala | 43 ++++-----
.../cassandra/EventsByTagMultiJvmSpec.scala | 10 +-
.../journal/CassandraIntegrationSpec.scala | 104 ++++++++++-----------
.../cassandra/journal/PersistAllSpec.scala | 10 +-
.../cassandra/journal/TagScanningSpec.scala | 7 +-
.../query/EventsByPersistenceIdSpec.scala | 61 ++++++------
.../ClusterShardingQuickTerminationSpec.scala | 3 +-
.../test/java/jdoc/cleanup/CleanupDocExample.java | 10 +-
.../test/scala/doc/cleanup/CleanupDocExample.scala | 9 ++
dse-test/src/test/resources/application.conf | 2 +
16 files changed, 207 insertions(+), 204 deletions(-)
diff --git a/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala b/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
index 2fa8ddd..7cee643 100644
--- a/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
+++ b/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
@@ -169,11 +169,14 @@ object CassandraLauncher {
resources
.map { resource =>
this.getClass.getClassLoader.getResource(resource) match {
- case null => sys.error("Resource not found: " + resource)
+ case null =>
+ sys.error("Resource not found: " + resource)
case fileUrl if fileUrl.getProtocol == "file" =>
new File(URI.create(fileUrl.toString.stripSuffix(resource))).getCanonicalPath
case jarUrl if jarUrl.getProtocol == "jar" =>
new File(URI.create(jarUrl.getPath.takeWhile(_ != '!'))).getCanonicalPath
+ case _ =>
+ sys.error("Resource not supported: " + resource)
}
}
.distinct
diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/Extractors.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/Extractors.scala
index a7195a2..4a44391 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/Extractors.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/Extractors.scala
@@ -184,7 +184,7 @@ import org.apache.pekko.persistence.query.TimeBasedUUID
def deserializeEvent(): Future[PersistentRepr] = {
ed.deserializeEvent(row, async).map {
- case DeserializedEvent(payload, metadata) =>
+ case DeserializedEvent(payload, metadata: OptionVal[Any]) =>
val repr = PersistentRepr(
payload,
sequenceNr = row.getLong("sequence_nr"),
@@ -194,8 +194,8 @@ import org.apache.pekko.persistence.query.TimeBasedUUID
sender = null,
writerUuid = row.getString("writer_uuid"))
metadata match {
- case OptionVal.None => repr
- case OptionVal.Some(m) => repr.withMetadata(m)
+ case OptionVal.None => repr
+ case some => repr.withMetadata(some.x)
}
}
}
diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala
index 5d370c3..ebeafea 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/TagWriter.scala
@@ -236,8 +236,8 @@ import scala.util.{ Failure, Success, Try }
val nextBuffer = buffer.writeComplete()
buffer.nextBatch.foreach { write =>
write.ack match {
- case OptionVal.None =>
- case OptionVal.Some(ref) => ref ! Done
+ case OptionVal.None =>
+ case ref => ref.x ! Done
}
}
summary.foreach {
@@ -274,7 +274,7 @@ import scala.util.{ Failure, Success, Try }
sendPubsubNotification()
doneNotify.foreach(_ ! FlushComplete)
- case TagWriteFailed(t, events) =>
+ case TagWriteFailed(t, _) =>
log.warning(
"Writing tags has failed. This means that any eventsByTag query will be out of date. " +
"The write will be retried. Reason {}",
diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
index aebc15c..be83255 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdStage.scala
@@ -13,24 +13,21 @@
package org.apache.pekko.persistence.cassandra.query
-import java.lang.{ Long => JLong }
-import java.util.concurrent.ThreadLocalRandom
-
+import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql._
import org.apache.pekko.Done
import org.apache.pekko.annotation.InternalApi
-import org.apache.pekko.stream.{ Attributes, Outlet, SourceShape }
+import org.apache.pekko.persistence.cassandra.PluginSettings
import org.apache.pekko.stream.stage._
-import com.datastax.oss.driver.api.core.cql._
-import scala.annotation.tailrec
-import scala.concurrent.{ ExecutionContext, Future, Promise }
-import scala.concurrent.duration.{ FiniteDuration, _ }
-import scala.util.{ Failure, Success, Try }
+import org.apache.pekko.stream.{ Attributes, Outlet, SourceShape }
-import com.datastax.oss.driver.api.core.CqlSession
-import scala.annotation.nowarn
+import java.lang.{ Long => JLong }
+import java.util.concurrent.ThreadLocalRandom
+import scala.annotation.{ nowarn, tailrec }
import scala.compat.java8.FutureConverters._
-
-import org.apache.pekko.persistence.cassandra.PluginSettings
+import scala.concurrent.duration.{ FiniteDuration, _ }
+import scala.concurrent.{ ExecutionContext, Future, Promise }
+import scala.util.{ Failure, Success, Try }
/**
* INTERNAL API
@@ -123,8 +120,7 @@ import org.apache.pekko.persistence.cassandra.PluginSettings
extends GraphStageWithMaterializedValue[SourceShape[Row], EventsByPersistenceIdStage.Control] {
import EventsByPersistenceIdStage._
- import settings.querySettings
- import settings.journalSettings
+ import settings.{ journalSettings, querySettings }
val out: Outlet[Row] = Outlet("EventsByPersistenceId.out")
override val shape: SourceShape[Row] = SourceShape(out)
@@ -288,6 +284,7 @@ import org.apache.pekko.persistence.cassandra.PluginSettings
override protected def onTimer(timerKey: Any): Unit = timerKey match {
case Continue => continue()
case LookForMissingSeqNr => lookForMissingSeqNr()
+ case o => throw new IllegalStateException("Unexpected timerKey: " + o)
}
def continue(): Unit =
diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
index 48e67cb..c670df5 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagStage.scala
@@ -13,36 +13,31 @@
package org.apache.pekko.persistence.cassandra.query
-import java.util.UUID
-import java.util.concurrent.ThreadLocalRandom
-
+import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql.{ AsyncResultSet, Row }
+import com.datastax.oss.driver.api.core.uuid.Uuids
+import org.apache.pekko.actor.Scheduler
import org.apache.pekko.annotation.InternalApi
+import org.apache.pekko.cluster.pubsub.{ DistributedPubSub, DistributedPubSubMediator }
+import org.apache.pekko.persistence.cassandra.EventsByTagSettings.RetrySettings
import org.apache.pekko.persistence.cassandra._
+import org.apache.pekko.persistence.cassandra.journal.CassandraJournal._
import org.apache.pekko.persistence.cassandra.journal.TimeBucket
import org.apache.pekko.persistence.cassandra.query.EventsByTagStage._
-import org.apache.pekko.stream.stage.{ GraphStage, _ }
+import org.apache.pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
+import org.apache.pekko.stream.stage._
import org.apache.pekko.stream.{ Attributes, Outlet, SourceShape }
import org.apache.pekko.util.PrettyDuration._
+import org.apache.pekko.util.UUIDComparator
+import java.lang.{ Long => JLong }
+import java.util.UUID
+import java.util.concurrent.ThreadLocalRandom
import scala.annotation.tailrec
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration._
+import scala.compat.java8.FutureConverters._
+import scala.concurrent.duration.{ Duration, _ }
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future }
import scala.util.{ Failure, Success, Try }
-import java.lang.{ Long => JLong }
-
-import org.apache.pekko.actor.Scheduler
-import org.apache.pekko.cluster.pubsub.{ DistributedPubSub, DistributedPubSubMediator }
-import org.apache.pekko.persistence.cassandra.EventsByTagSettings.RetrySettings
-import org.apache.pekko.persistence.cassandra.journal.CassandraJournal._
-import org.apache.pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
-import org.apache.pekko.util.UUIDComparator
-import com.datastax.oss.driver.api.core.CqlSession
-import com.datastax.oss.driver.api.core.cql.AsyncResultSet
-import com.datastax.oss.driver.api.core.cql.Row
-import com.datastax.oss.driver.api.core.uuid.Uuids
-
-import scala.compat.java8.FutureConverters._
/**
* INTERNAL API
@@ -227,8 +222,7 @@ import scala.compat.java8.FutureConverters._
scanner: TagViewSequenceNumberScanner)
extends GraphStage[SourceShape[UUIDRow]] {
- import settings.querySettings
- import settings.eventsByTagSettings
+ import settings.{ eventsByTagSettings, querySettings }
private val out: Outlet[UUIDRow] = Outlet("event.out")
private val verboseDebug = eventsByTagSettings.verboseDebug
@@ -449,12 +443,10 @@ import scala.compat.java8.FutureConverters._
}
override protected def onTimer(timerKey: Any): Unit = timerKey match {
- case _: QueryPoll =>
- continue()
- case PersistenceIdsCleanup =>
- cleanup()
- case ScanForDelayedEvents =>
- scanForDelayedEvents()
+ case _: QueryPoll => continue()
+ case PersistenceIdsCleanup => cleanup()
+ case ScanForDelayedEvents => scanForDelayedEvents()
+ case o => throw new IllegalStateException("Unexpected timerKey: " + o)
}
override def onPull(): Unit = {
diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
index a0cd916..3c4a12d 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala
@@ -13,45 +13,42 @@
package org.apache.pekko.persistence.cassandra.query.scaladsl
-import java.net.URLEncoder
-import java.util.UUID
-
-import org.apache.pekko.{ Done, NotUsed }
+import com.datastax.oss.driver.api.core.CqlSession
+import com.datastax.oss.driver.api.core.cql._
+import com.datastax.oss.driver.api.core.uuid.Uuids
+import com.typesafe.config.Config
import org.apache.pekko.actor.{ ActorSystem, ExtendedActorSystem }
import org.apache.pekko.annotation.InternalApi
import org.apache.pekko.event.Logging
-import org.apache.pekko.persistence.cassandra.journal.CassandraJournal.{ PersistenceId, Tag, TagPidSequenceNr }
-import org.apache.pekko.persistence.cassandra.journal._
-import org.apache.pekko.persistence.cassandra.Extractors
import org.apache.pekko.persistence.cassandra.Extractors.Extractor
+import org.apache.pekko.persistence.cassandra.{ CassandraStatements, Extractors, PluginSettings }
+import org.apache.pekko.persistence.cassandra.journal.CassandraJournal.{
+ DeserializedEvent,
+ PersistenceId,
+ Tag,
+ TagPidSequenceNr
+}
+import org.apache.pekko.persistence.cassandra.journal._
import org.apache.pekko.persistence.cassandra.query.EventsByTagStage.TagStageSession
import org.apache.pekko.persistence.cassandra.query._
import org.apache.pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal.EventByTagStatements
import org.apache.pekko.persistence.query._
import org.apache.pekko.persistence.query.scaladsl._
import org.apache.pekko.persistence.{ Persistence, PersistentRepr }
-import org.apache.pekko.stream.scaladsl.Flow
-import org.apache.pekko.stream.scaladsl.Source
+import org.apache.pekko.serialization.SerializationExtension
import org.apache.pekko.stream.ActorAttributes
-import org.apache.pekko.util.ByteString
-import com.datastax.oss.driver.api.core.cql._
-import com.typesafe.config.Config
+import org.apache.pekko.stream.connectors.cassandra.CassandraSessionSettings
+import org.apache.pekko.stream.connectors.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
+import org.apache.pekko.stream.scaladsl.{ Flow, Source }
+import org.apache.pekko.util.{ ByteString, OptionVal }
+import org.apache.pekko.{ Done, NotUsed }
+import java.net.URLEncoder
+import java.util.UUID
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
-import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
-import org.apache.pekko.persistence.cassandra.PluginSettings
-import org.apache.pekko.persistence.cassandra.CassandraStatements
-import org.apache.pekko.persistence.cassandra.journal.CassandraJournal
-import org.apache.pekko.persistence.cassandra.journal.CassandraJournal.DeserializedEvent
-import org.apache.pekko.serialization.SerializationExtension
-import org.apache.pekko.stream.connectors.cassandra.CassandraSessionSettings
-import org.apache.pekko.stream.connectors.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
-import org.apache.pekko.util.OptionVal
-import com.datastax.oss.driver.api.core.CqlSession
-import com.datastax.oss.driver.api.core.uuid.Uuids
object CassandraReadJournal {
@@ -125,8 +122,7 @@ class CassandraReadJournal protected (
private val settings = new PluginSettings(system, sharedConfig)
private val statements = new CassandraStatements(settings)
- import settings.querySettings
- import settings.eventsByTagSettings
+ import settings.{ eventsByTagSettings, querySettings }
if (eventsByTagSettings.eventualConsistency < 1.seconds) {
log.warning(
@@ -371,8 +367,8 @@ class CassandraReadJournal protected (
sender = null,
writerUuid = row.getString("writer_uuid")))
val reprWithMeta = metadata match {
- case OptionVal.None => repr
- case OptionVal.Some(metadata) => repr.withMetadata(metadata)
+ case OptionVal.None => repr
+ case metadata => repr.withMetadata(metadata.x)
}
UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, reprWithMeta)
}
@@ -446,9 +442,9 @@ class CassandraReadJournal protected (
def getSession: CqlSession = session.underlying().value.get.get
prepStmt.value match {
- case Some(Success(ps)) => source(getSession, ps)
- case Some(Failure(e)) => Source.failed(e)
- case None =>
+ case Some(util.Success(ps)) => source(getSession, ps)
+ case Some(util.Failure(e)) => Source.failed(e)
+ case None =>
// completed later
Source
.maybe[P]
@@ -471,9 +467,9 @@ class CassandraReadJournal protected (
def getSession: CqlSession = session.underlying().value.get.get
prepStmt.value match {
- case Some(Success(ps)) =>
+ case Some(util.Success(ps)) =>
source(getSession, ps).mapMaterializedValue(Future.successful)
- case Some(Failure(e)) =>
+ case Some(util.Failure(e)) =>
Source.failed(e).mapMaterializedValue(_ => Future.failed(e))
case None =>
// completed later
diff --git a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
index 259e649..ffef47b 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala
@@ -13,39 +13,30 @@
package org.apache.pekko.persistence.cassandra.snapshot
-import java.lang.{ Long => JLong }
-import java.nio.ByteBuffer
-import java.util.NoSuchElementException
-
-import org.apache.pekko.NotUsed
-
-import scala.collection.immutable
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.util.Failure
-import scala.util.Success
-import scala.util.control.NonFatal
-import org.apache.pekko.actor._
-import org.apache.pekko.pattern.pipe
-import org.apache.pekko.persistence._
-import org.apache.pekko.persistence.cassandra._
-import org.apache.pekko.persistence.serialization.Snapshot
-import org.apache.pekko.persistence.snapshot.SnapshotStore
-import org.apache.pekko.serialization.AsyncSerializer
-import org.apache.pekko.serialization.Serialization
-import org.apache.pekko.serialization.SerializationExtension
-import org.apache.pekko.serialization.Serializers
-import org.apache.pekko.stream.scaladsl.Sink
-import org.apache.pekko.stream.scaladsl.Source
-import org.apache.pekko.util.{ unused, OptionVal }
import com.datastax.oss.driver.api.core.cql._
import com.datastax.oss.protocol.internal.util.Bytes
import com.typesafe.config.Config
-import org.apache.pekko.Done
+import org.apache.pekko.{ Done, NotUsed }
+import org.apache.pekko.actor._
import org.apache.pekko.annotation.InternalApi
import org.apache.pekko.dispatch.ExecutionContexts
import org.apache.pekko.event.Logging
+import org.apache.pekko.pattern.pipe
+import org.apache.pekko.persistence._
+import org.apache.pekko.persistence.cassandra._
+import org.apache.pekko.persistence.serialization.Snapshot
+import org.apache.pekko.persistence.snapshot.SnapshotStore
+import org.apache.pekko.serialization.{ AsyncSerializer, Serialization, SerializationExtension, Serializers }
import org.apache.pekko.stream.connectors.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
+import org.apache.pekko.stream.scaladsl.{ Sink, Source }
+import org.apache.pekko.util.{ unused, OptionVal }
+
+import java.lang.{ Long => JLong }
+import java.nio.ByteBuffer
+import scala.collection.immutable
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.{ Failure, Success }
+import scala.util.control.NonFatal
/**
* INTERNAL API
diff --git a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
index 6b1bea6..4b5a242 100644
--- a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
+++ b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
@@ -9,22 +9,20 @@
package org.apache.pekko.cluster.persistence.cassandra
-import java.io.File
-import java.time.{ LocalDateTime, ZoneOffset }
-
-import org.apache.pekko.persistence.cassandra.CassandraLifecycle
+import com.typesafe.config.ConfigFactory
import org.apache.pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
-import org.apache.pekko.persistence.cassandra.query._
+import org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher
import org.apache.pekko.persistence.journal.Tagged
import org.apache.pekko.persistence.query.{ NoOffset, PersistenceQuery }
import org.apache.pekko.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import org.apache.pekko.stream.ActorMaterializer
import org.apache.pekko.stream.testkit.TestSubscriber
import org.apache.pekko.stream.testkit.scaladsl.TestSink
-import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
+import java.io.File
+
object EventsByTagMultiJvmSpec extends MultiNodeConfig {
// No way to start and distribute the port so hard coding
final val CassPort = 9142
diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraIntegrationSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraIntegrationSpec.scala
index 0f1c642..445c280 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraIntegrationSpec.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/CassandraIntegrationSpec.scala
@@ -82,7 +82,7 @@ object CassandraIntegrationSpec {
def receiveRecover: Receive = {
case SnapshotOffer(_, snapshot: String) =>
last = snapshot
- probe ! s"offered-${last}"
+ probe ! s"offered-$last"
case payload: String =>
handle(payload)
}
@@ -91,7 +91,7 @@ object CassandraIntegrationSpec {
case "snap" =>
saveSnapshot(last)
case SaveSnapshotSuccess(_) =>
- probe ! s"snapped-${last}"
+ probe ! s"snapped-$last"
case payload: String =>
persist(payload)(handle)
case DeleteTo(sequenceNr) =>
@@ -101,15 +101,15 @@ object CassandraIntegrationSpec {
def handle: Receive = {
case payload: String =>
- last = s"${payload}-${lastSequenceNr}"
- probe ! s"updated-${last}"
+ last = s"$payload-$lastSequenceNr"
+ probe ! s"updated-$last"
}
}
class ProcessorCNoRecover(override val persistenceId: String, probe: ActorRef, recoverConfig: Recovery)
extends ProcessorC(persistenceId, probe) {
- override def recovery = recoverConfig
- override def preStart() = ()
+ override def recovery: Recovery = recoverConfig
+ override def preStart(): Unit = ()
}
}
@@ -134,47 +134,47 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
val deleteProbe = TestProbe()
subscribeToRangeDeletion(deleteProbe)
- val processor1 = system.actorOf(Props(classOf[ProcessorA], persistenceId, self))
+ val processor1 = system.actorOf(Props(new ProcessorA(persistenceId, self)))
(1L to 16L).foreach { i =>
- processor1 ! s"a-${i}"
- expectMsgAllOf(s"a-${i}", i, false)
+ processor1 ! s"a-$i"
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
processor1 ! DeleteTo(3L)
awaitRangeDeletion(deleteProbe)
- system.actorOf(Props(classOf[ProcessorA], persistenceId, self))
+ system.actorOf(Props(new ProcessorA(persistenceId, self)))
(4L to 16L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, true)
+ expectMsgAllOf[Any](s"a-$i", i, true)
}
processor1 ! DeleteTo(7L)
awaitRangeDeletion(deleteProbe)
- system.actorOf(Props(classOf[ProcessorA], persistenceId, self))
+ system.actorOf(Props(new ProcessorA(persistenceId, self)))
(8L to 16L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, true)
+ expectMsgAllOf[Any](s"a-$i", i, true)
}
}
"A Cassandra journal" must {
"write and replay messages" in {
val persistenceId = UUID.randomUUID().toString
- val processor1 = system.actorOf(Props(classOf[ProcessorA], persistenceId, self), "p1")
+ val processor1 = system.actorOf(Props(new ProcessorA(persistenceId, self)), "p1")
(1L to 16L).foreach { i =>
- processor1 ! s"a-${i}"
- expectMsgAllOf(s"a-${i}", i, false)
+ processor1 ! s"a-$i"
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
stopAndWaitUntilTerminated(processor1)
- val processor2 = system.actorOf(Props(classOf[ProcessorA], persistenceId, self), "p2")
+ val processor2 = system.actorOf(Props(new ProcessorA(persistenceId, self)), "p2")
(1L to 16L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, true)
+ expectMsgAllOf[Any](s"a-$i", i, true)
}
processor2 ! "b"
- expectMsgAllOf("b", 17L, false)
+ expectMsgAllOf[Any]("b", 17L, false)
}
"not replay range-deleted messages" in {
@@ -184,63 +184,63 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
"write and replay with persistAll greater than partition size skipping whole partition" in {
val persistenceId = UUID.randomUUID().toString
- val processorAtomic = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, self))
+ val processorAtomic = system.actorOf(Props(new ProcessorAtomic(persistenceId, self)))
// more than 2 partitions not supported, will fail the write
processorAtomic ! List("a-1", "a-2", "a-3", "a-4", "a-5", "a-6")
(1L to 6L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, false)
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
stopAndWaitUntilTerminated(processorAtomic)
val testProbe = TestProbe()
- val processor2 = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, testProbe.ref))
+ val processor2 = system.actorOf(Props(new ProcessorAtomic(persistenceId, testProbe.ref)))
(1L to 6L).foreach { i =>
- testProbe.expectMsgAllOf(s"a-${i}", i, true)
+ testProbe.expectMsgAllOf[Any](s"a-$i", i, true)
}
processor2
}
"write and replay with persistAll greater than partition size skipping part of a partition" in {
val persistenceId = UUID.randomUUID().toString
- val processorAtomic = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, self))
+ val processorAtomic = system.actorOf(Props(new ProcessorAtomic(persistenceId, self)))
processorAtomic ! List("a-1", "a-2", "a-3")
(1L to 3L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, false)
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
processorAtomic ! List("a-4", "a-5", "a-6")
(4L to 6L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, false)
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
stopAndWaitUntilTerminated(processorAtomic)
val testProbe = TestProbe()
- val processor2 = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, testProbe.ref))
+ val processor2 = system.actorOf(Props(new ProcessorAtomic(persistenceId, testProbe.ref)))
(1L to 6L).foreach { i =>
- testProbe.expectMsgAllOf(s"a-${i}", i, true)
+ testProbe.expectMsgAllOf[Any](s"a-$i", i, true)
}
processor2
}
"write and replay with persistAll less than partition size" in {
val persistenceId = UUID.randomUUID().toString
- val processorAtomic = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, self))
+ val processorAtomic = system.actorOf(Props(new ProcessorAtomic(persistenceId, self)))
processorAtomic ! List("a-1", "a-2", "a-3", "a-4")
(1L to 4L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, false)
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
stopAndWaitUntilTerminated(processorAtomic)
- system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, self))
+ system.actorOf(Props(new ProcessorAtomic(persistenceId, self)))
(1L to 4L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, true)
+ expectMsgAllOf[Any](s"a-$i", i, true)
}
}
@@ -248,11 +248,11 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
val persistenceId = UUID.randomUUID().toString
val deleteProbe = TestProbe()
subscribeToRangeDeletion(deleteProbe)
- val processorAtomic = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, self))
+ val processorAtomic = system.actorOf(Props(new ProcessorAtomic(persistenceId, self)))
processorAtomic ! List("a-1", "a-2", "a-3", "a-4", "a-5", "a-6")
(1L to 6L).foreach { i =>
- expectMsgAllOf(s"a-${i}", i, false)
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
processorAtomic ! DeleteTo(5L)
awaitRangeDeletion(deleteProbe)
@@ -260,15 +260,15 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
stopAndWaitUntilTerminated(processorAtomic)
val testProbe = TestProbe()
- system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, testProbe.ref))
- testProbe.expectMsgAllOf(s"a-6", 6, true)
+ system.actorOf(Props(new ProcessorAtomic(persistenceId, testProbe.ref)))
+ testProbe.expectMsgAllOf[Any](s"a-6", 6, true)
}
}
"A processor" must {
"recover from a snapshot with follow-up messages" in {
val persistenceId = UUID.randomUUID().toString
- val processor1 = system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ val processor1 = system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
processor1 ! "a"
expectMsg("updated-a-1")
processor1 ! "snap"
@@ -278,26 +278,26 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
stopAndWaitUntilTerminated(processor1)
- system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
expectMsg("offered-a-1")
expectMsg("updated-b-2")
}
"recover from a snapshot with follow-up messages and an upper bound" in {
val persistenceId = UUID.randomUUID().toString
- val processor1 = system.actorOf(Props(classOf[ProcessorCNoRecover], persistenceId, testActor, Recovery()))
+ val processor1 = system.actorOf(Props(new ProcessorCNoRecover(persistenceId, testActor, Recovery())))
processor1 ! "a"
expectMsg("updated-a-1")
processor1 ! "snap"
expectMsg("snapped-a-1")
(2L to 7L).foreach { i =>
processor1 ! "a"
- expectMsg(s"updated-a-${i}")
+ expectMsg(s"updated-a-$i")
}
stopAndWaitUntilTerminated(processor1)
val processor2 =
- system.actorOf(Props(classOf[ProcessorCNoRecover], persistenceId, testActor, Recovery(toSequenceNr = 3L)))
+ system.actorOf(Props(new ProcessorCNoRecover(persistenceId, testActor, Recovery(toSequenceNr = 3L))))
expectMsg("offered-a-1")
expectMsg("updated-a-2")
expectMsg("updated-a-3")
@@ -306,7 +306,7 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
}
"recover from a snapshot without follow-up messages inside a partition" in {
val persistenceId = UUID.randomUUID().toString
- val processor1 = system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ val processor1 = system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
processor1 ! "a"
expectMsg("updated-a-1")
processor1 ! "snap"
@@ -314,24 +314,24 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
stopAndWaitUntilTerminated(processor1)
- val processor2 = system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ val processor2 = system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
expectMsg("offered-a-1")
processor2 ! "b"
expectMsg("updated-b-2")
}
"recover from a snapshot without follow-up messages at a partition boundary (where next partition is invalid)" in {
val persistenceId = UUID.randomUUID().toString
- val processor1 = system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ val processor1 = system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
(1L to 5L).foreach { i =>
processor1 ! "a"
- expectMsg(s"updated-a-${i}")
+ expectMsg(s"updated-a-$i")
}
processor1 ! "snap"
expectMsg("snapped-a-5")
stopAndWaitUntilTerminated(processor1)
- val processor2 = system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ val processor2 = system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
expectMsg("offered-a-5")
processor2 ! "b"
expectMsg("updated-b-6")
@@ -341,10 +341,10 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
val deleteProbe = TestProbe()
subscribeToRangeDeletion(deleteProbe)
- val processor1 = system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ val processor1 = system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
(1L to 5L).foreach { i =>
processor1 ! "a"
- expectMsg(s"updated-a-${i}")
+ expectMsg(s"updated-a-$i")
}
processor1 ! "snap"
expectMsg("snapped-a-5")
@@ -357,7 +357,7 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
stopAndWaitUntilTerminated(processor1)
- val processor2 = system.actorOf(Props(classOf[ProcessorC], persistenceId, testActor))
+ val processor2 = system.actorOf(Props(new ProcessorC(persistenceId, testActor)))
expectMsg("offered-a-5")
processor2 ! "b"
expectMsg("updated-b-7") // no longer re-using sequence numbers
@@ -367,17 +367,17 @@ class CassandraIntegrationSpec extends CassandraSpec(config) with ImplicitSender
val deleteProbe = TestProbe()
subscribeToRangeDeletion(deleteProbe)
- val p = system.actorOf(Props(classOf[ProcessorA], persistenceId, self))
+ val p = system.actorOf(Props(new ProcessorA(persistenceId, self)))
p ! "a"
- expectMsgAllOf("a", 1L, false)
+ expectMsgAllOf[Any]("a", 1L, false)
p ! DeleteTo(1L)
awaitRangeDeletion(deleteProbe)
stopAndWaitUntilTerminated(p)
- val r = system.actorOf(Props(classOf[ProcessorA], persistenceId, self))
+ val r = system.actorOf(Props(new ProcessorA(persistenceId, self)))
r ! "b"
expectMsgAllOf("b", 2L, false) // no longer re-using sequence numbers
diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/PersistAllSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/PersistAllSpec.scala
index 92d32d8..25727cb 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/PersistAllSpec.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/PersistAllSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
object PersistAllSpec {
- val config = ConfigFactory.parseString(s"""
+ private val config = ConfigFactory.parseString(s"""
pekko.persistence.cassandra.journal.max-message-batch-size = 100
pekko.persistence.cassandra.journal.keyspace=PersistAllSpec
pekko.persistence.cassandra.snapshot.keyspace=PersistAllSpecSnapshot
@@ -67,21 +67,21 @@ class PersistAllSpec extends CassandraSpec(config) with ImplicitSender with AnyW
// reproducer of issue #869
"write and replay with persistAll greater max-message-batch-size" in {
val persistenceId = UUID.randomUUID().toString
- val processorAtomic = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, self))
+ val processorAtomic = system.actorOf(Props(new ProcessorAtomic(persistenceId, self)))
val N = 200
processorAtomic ! (1 to N).map(n => s"a-$n").toList
(1L to N).foreach { i =>
- expectMsgAllOf(s"a-$i", i, false)
+ expectMsgAllOf[Any](s"a-$i", i, false)
}
stopAndWaitUntilTerminated(processorAtomic)
val testProbe = TestProbe()
- val processor2 = system.actorOf(Props(classOf[ProcessorAtomic], persistenceId, testProbe.ref))
+ val processor2 = system.actorOf(Props(new ProcessorAtomic(persistenceId, testProbe.ref)))
(1L to N).foreach { i =>
- testProbe.expectMsgAllOf(s"a-$i", i, true)
+ testProbe.expectMsgAllOf[Any](s"a-$i", i, true)
}
processor2
}
diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagScanningSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagScanningSpec.scala
index 31329c4..e250d4d 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagScanningSpec.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/journal/TagScanningSpec.scala
@@ -13,12 +13,11 @@
package org.apache.pekko.persistence.cassandra.journal
-import org.apache.pekko.actor._
-import org.apache.pekko.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, TestTaggingActor }
import com.typesafe.config.ConfigFactory
+import org.apache.pekko.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, TestTaggingActor }
object TagScanningSpec {
- val config = ConfigFactory.parseString(s"""
+ private val config = ConfigFactory.parseString(s"""
pekko.persistence.cassandra.events-by-tag.enabled = on
pekko.persistence.cassandra.events-by-tag.scanning-flush-interval = 2s
pekko.persistence.cassandra.journal.replay-filter.mode = off
@@ -40,7 +39,7 @@ class TagScanningSpec extends CassandraSpec(TagScanningSpec.config) {
import scala.jdk.CollectionConverters._
val expected = (0 until nrActors).map(n => (s"$n".toInt, 1L)).toList
val scanning = cluster
- .execute(s"select * from ${journalName}.tag_scanning")
+ .execute(s"select * from $journalName.tag_scanning")
.all()
.asScala
.toList
diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
index 44a5766..4a3e9eb 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByPersistenceIdSpec.scala
@@ -13,23 +13,22 @@
package org.apache.pekko.persistence.cassandra.query
-import java.util.UUID
-
+import com.typesafe.config.ConfigFactory
import org.apache.pekko.actor.ActorRef
import org.apache.pekko.persistence.cassandra.{ CassandraLifecycle, CassandraSpec }
+import org.apache.pekko.persistence.query.TimeBasedUUID
import org.apache.pekko.persistence.{ DeleteMessagesSuccess, PersistentRepr }
import org.apache.pekko.stream.KillSwitches
-import org.apache.pekko.stream.scaladsl.Keep
+import org.apache.pekko.stream.scaladsl.{ Keep, Sink }
import org.apache.pekko.stream.testkit.scaladsl.TestSink
-import com.typesafe.config.ConfigFactory
-import scala.concurrent.duration._
-
-import org.apache.pekko.persistence.query.TimeBasedUUID
-import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.util.UUIDComparator
+import org.scalatest.Inside
+
+import java.util.UUID
+import scala.concurrent.duration._
object EventsByPersistenceIdSpec {
- val config = ConfigFactory.parseString(s"""
+ private val config = ConfigFactory.parseString(s"""
pekko.persistence.cassandra.journal.target-partition-size = 15
pekko.persistence.cassandra.query.refresh-interval = 0.5s
pekko.persistence.cassandra.query.max-result-size-query = 2
@@ -38,9 +37,9 @@ object EventsByPersistenceIdSpec {
""").withFallback(CassandraLifecycle.config)
}
-class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.config) with DirectWriting {
+class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.config) with DirectWriting with Inside {
- val noMsgTimeout = 100.millis
+ private val noMsgTimeout = 100.millis
def setup(persistenceId: String, n: Int): ActorRef = {
val ref = system.actorOf(TestActor.props(persistenceId))
@@ -175,12 +174,16 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.
probe.request(5)
- val (event1, seqNr1, TimeBasedUUID(uuid1)) = probe.requestNext()
- event1 shouldBe "jo-1"
- seqNr1 shouldBe 1
- val (event2, seqNr2, TimeBasedUUID(uuid2)) = probe.requestNext()
- event2 shouldBe "jo-2"
- seqNr2 shouldBe 2
+ val uuid1 = inside(probe.requestNext()) { case (event1, seqNr1, TimeBasedUUID(uuid)) =>
+ event1 shouldBe "jo-1"
+ seqNr1 shouldBe 1
+ uuid
+ }
+ val uuid2 = inside(probe.requestNext()) { case (event2, seqNr2, TimeBasedUUID(uuid)) =>
+ event2 shouldBe "jo-2"
+ seqNr2 shouldBe 2
+ uuid
+ }
system.log.debug("Saw evt 1 and 2")
// 4 arrived out of order
@@ -197,17 +200,23 @@ class EventsByPersistenceIdSpec extends CassandraSpec(EventsByPersistenceIdSpec.
writeTestEvent(PersistentRepr("jo-3", 3L, "jo"))
system.log.debug("Wrote evt 3")
- val (event3, seqNr3, TimeBasedUUID(uuid3)) = probe.requestNext()
- event3 shouldBe "jo-3"
- seqNr3 shouldBe 3
+ val uuid3 = inside(probe.requestNext()) { case (event3, seqNr3, TimeBasedUUID(uuid)) =>
+ event3 shouldBe "jo-3"
+ seqNr3 shouldBe 3
+ uuid
+ }
- val (event4, seqNr4, TimeBasedUUID(uuid4)) = probe.requestNext()
- event4 shouldBe "jo-4"
- seqNr4 shouldBe 4
+ val uuid4 = inside(probe.requestNext()) { case (event4, seqNr4, TimeBasedUUID(uuid)) =>
+ event4 shouldBe "jo-4"
+ seqNr4 shouldBe 4
+ uuid
+ }
- val (event5, seqNr5, TimeBasedUUID(uuid5)) = probe.requestNext()
- event5 shouldBe "jo-5"
- seqNr5 shouldBe 5
+ val uuid5 = inside(probe.requestNext()) { case (event5, seqNr5, TimeBasedUUID(uuid)) =>
+ event5 shouldBe "jo-5"
+ seqNr5 shouldBe 5
+ uuid
+ }
UUIDComparator.comparator.compare(uuid1, uuid2) should be < 0
UUIDComparator.comparator.compare(uuid2, uuid4) should be < 0
diff --git a/core/src/test/scala/org/apache/pekko/persistence/cassandra/sharding/ClusterShardingQuickTerminationSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/cassandra/sharding/ClusterShardingQuickTerminationSpec.scala
index 6617b96..966078b 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/cassandra/sharding/ClusterShardingQuickTerminationSpec.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/cassandra/sharding/ClusterShardingQuickTerminationSpec.scala
@@ -14,8 +14,8 @@
package org.apache.pekko.persistence.cassandra.sharding
import org.apache.pekko.actor.{ ActorLogging, ActorRef, Props, ReceiveTimeout }
-import org.apache.pekko.cluster.{ Cluster, MemberStatus }
import org.apache.pekko.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
+import org.apache.pekko.cluster.{ Cluster, MemberStatus }
import org.apache.pekko.persistence.PersistentActor
import org.apache.pekko.persistence.cassandra.CassandraSpec
import org.apache.pekko.testkit.TestProbe
@@ -72,7 +72,6 @@ object ClusterShardingQuickTerminationSpec {
case EntityEnvelope(id, _) => (id % numberOfShards).toString
case Get(id) => (id % numberOfShards).toString
}
-
}
class ClusterShardingQuickTerminationSpec extends CassandraSpec("""
diff --git a/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java
index 2bbf612..2357d68 100644
--- a/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java
+++ b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java
@@ -1,6 +1,14 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
package jdoc.cleanup;
-import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.persistence.cassandra.cleanup.Cleanup;
import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal;
diff --git a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
index 347152b..9e776f5 100644
--- a/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
+++ b/docs/src/test/scala/doc/cleanup/CleanupDocExample.scala
@@ -1,3 +1,12 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, derived from Akka.
+ */
+
package doc.cleanup
import org.apache.pekko.actor.ActorSystem
diff --git a/dse-test/src/test/resources/application.conf b/dse-test/src/test/resources/application.conf
index 68b931d..18de9b1 100644
--- a/dse-test/src/test/resources/application.conf
+++ b/dse-test/src/test/resources/application.conf
@@ -1,3 +1,5 @@
+# SPDX-License-Identifier: Apache-2.0
+
datastax-java-driver {
basic.contact-points = [ "127.0.0.1:9043"]
basic.load-balancing-policy.local-datacenter = "dc1"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org