You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2023/02/09 23:06:05 UTC

[incubator-pekko-connectors-kafka] branch main updated: Using `pekko` instead of `akka` in reference.conf and related configurations (#30)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ee59d4d Using `pekko` instead of `akka`  in reference.conf and related configurations (#30)
7ee59d4d is described below

commit 7ee59d4d7e6808d2141fb6d02143bbedf6f222ef
Author: Seeta Ramayya <35...@users.noreply.github.com>
AuthorDate: Fri Feb 10 00:05:59 2023 +0100

    Using `pekko` instead of `akka`  in reference.conf and related configurations (#30)
    
    * Configuration : Using pekko instead of akka in reference.conf and other conf files
    
    * Replaced akka documentation url with pekko documentation url
---
 .github/workflows/check-build-test.yml             |  2 +-
 .../pekko/kafka/benchmarks/BatchedConsumer.scala   |  7 +-
 .../apache/pekko/kafka/benchmarks/Benchmarks.scala |  8 +-
 .../kafka/benchmarks/NoCommitBackpressure.scala    |  2 +-
 .../PekkoConnectorsCommittableProducer.scala       |  8 +-
 .../apache/pekko/kafka/benchmarks/Producer.scala   |  2 +-
 .../pekko/kafka/benchmarks/Transactions.scala      |  4 +-
 benchmarks/src/main/resources/reference.conf       |  6 +-
 .../pekko/kafka/benchmarks/InflightMetrics.scala   |  2 +-
 .../cluster/sharding/KafkaClusterSharding.scala    | 20 ++---
 core/src/main/resources/reference.conf             | 42 +++++-----
 .../org/apache/pekko/kafka/CommitterSettings.scala | 22 +++---
 .../pekko/kafka/ConnectionCheckerSettings.scala    |  2 +-
 .../org/apache/pekko/kafka/ConsumerSettings.scala  | 38 ++++-----
 .../kafka/OffsetResetProtectionSettings.scala      |  2 +-
 .../org/apache/pekko/kafka/ProducerSettings.scala  | 38 ++++-----
 .../kafka/internal/ConsumerResetProtection.scala   |  4 +-
 .../pekko/kafka/internal/LoggingWithId.scala       |  4 +-
 .../pekko/kafka/internal/SourceLogicBuffer.scala   |  2 +-
 .../org/apache/pekko/kafka/javadsl/Consumer.scala  |  6 +-
 .../org/apache/pekko/kafka/javadsl/Producer.scala  |  6 +-
 .../apache/pekko/kafka/javadsl/SendProducer.scala  |  4 +-
 .../apache/pekko/kafka/javadsl/Transactional.scala |  6 +-
 .../org/apache/pekko/kafka/scaladsl/Consumer.scala | 16 ++--
 .../pekko/kafka/scaladsl/DiscoverySupport.scala    |  8 +-
 .../org/apache/pekko/kafka/scaladsl/Producer.scala |  6 +-
 .../apache/pekko/kafka/scaladsl/SendProducer.scala |  2 +-
 .../pekko/kafka/scaladsl/Transactional.scala       |  6 +-
 project/ProjectSettings.scala                      |  6 +-
 project/VersionGenerator.scala                     |  8 +-
 project/Versions.scala                             |  2 +-
 project/project-info.conf                          |  2 +-
 .../testkit/internal/KafkaContainerCluster.java    |  2 +-
 testkit/src/main/resources/reference.conf          | 10 +--
 .../pekko/kafka/testkit/KafkaTestkitSettings.scala |  2 +-
 .../KafkaTestkitTestcontainersSettings.scala       |  2 +-
 tests/src/it/resources/reference.conf              |  8 +-
 .../java/docs/javadsl/ConsumerExampleTest.java     |  2 +-
 tests/src/test/java/docs/javadsl/ProducerTest.java |  2 +-
 tests/src/test/resources/application.conf          |  8 +-
 .../docs/scaladsl/ClusterShardingExample.scala     |  6 +-
 .../test/scala/docs/scaladsl/ConsumerExample.scala |  2 +-
 .../test/scala/docs/scaladsl/ProducerExample.scala |  6 +-
 .../apache/pekko/kafka/ConsumerSettingsSpec.scala  | 90 +++++++++++-----------
 .../apache/pekko/kafka/ProducerSettingsSpec.scala  | 78 +++++++++----------
 .../kafka/internal/CommittingWithMockSpec.scala    |  2 +-
 .../apache/pekko/kafka/internal/ConsumerSpec.scala |  2 +-
 .../kafka/internal/PartitionedSourceSpec.scala     |  2 +-
 .../apache/pekko/kafka/internal/ProducerSpec.scala |  6 +-
 .../pekko/kafka/scaladsl/IntegrationSpec.scala     |  2 +-
 .../pekko/kafka/scaladsl/RebalanceSpec.scala       |  4 +-
 .../pekko/kafka/tests/CapturingAppender.scala      | 10 +--
 .../org/apache/pekko/kafka/tests/LogbackUtil.scala |  2 +-
 .../kafka/tests/javadsl/LogCapturingJunit4.scala   |  6 +-
 .../pekko/kafka/tests/scaladsl/LogCapturing.scala  |  6 +-
 55 files changed, 277 insertions(+), 276 deletions(-)

diff --git a/.github/workflows/check-build-test.yml b/.github/workflows/check-build-test.yml
index bcc82532..7288ceb2 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -10,7 +10,7 @@ on:
     tags-ignore: [ v.* ]
 
 env:
-  AKKA_TEST_TIMEFACTOR: 10.0
+  PEKKO_TEST_TIMEFACTOR: 10.0
   EVENT_NAME: ${{ github.event_name }}
 
 jobs:
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala
index 891a00a8..d7955a88 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala
@@ -39,14 +39,14 @@ class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
 class PekkoConnectorsKafkaBatchedConsumer extends BenchmarksBase() {
 
   it should "bench with small messages" in {
-    val cmd = RunTestCommand("alpakka-kafka-batched-consumer", bootstrapServers, topic_1000_100)
+    val cmd = RunTestCommand("pekko-connectors-kafka-batched-consumer", bootstrapServers, topic_1000_100)
     runPerfTest(cmd,
       ReactiveKafkaConsumerFixtures.committableSources(cmd),
       ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
   }
 
   it should "bench with normal messages" in {
-    val cmd = RunTestCommand("alpakka-kafka-batched-consumer-normal-msg", bootstrapServers, topic_1000_5000)
+    val cmd = RunTestCommand("pekko-connectors-kafka-batched-consumer-normal-msg", bootstrapServers, topic_1000_5000)
     runPerfTest(cmd,
       ReactiveKafkaConsumerFixtures.committableSources(cmd),
       ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
@@ -54,7 +54,8 @@ class PekkoConnectorsKafkaBatchedConsumer extends BenchmarksBase() {
 
   it should "bench with normal messages and eight partitions" in {
     val cmd =
-      RunTestCommand("alpakka-kafka-batched-consumer-normal-msg-8-partitions", bootstrapServers, topic_1000_5000_8)
+      RunTestCommand("pekko-connectors-kafka-batched-consumer-normal-msg-8-partitions", bootstrapServers,
+        topic_1000_5000_8)
     runPerfTest(cmd,
       ReactiveKafkaConsumerFixtures.committableSources(cmd),
       ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala
index a72ddb8e..78c8ca1a 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala
@@ -63,7 +63,7 @@ class ApacheKafkaConsumerNokafka extends BenchmarksBase() {
 
 class PekkoConnectorsKafkaConsumerNokafka extends BenchmarksBase() {
   it should "bench" in {
-    val cmd = RunTestCommand("alpakka-kafka-plain-consumer-nokafka", bootstrapServers, topic_2000_100)
+    val cmd = RunTestCommand("pekko-connectors-kafka-plain-consumer-nokafka", bootstrapServers, topic_2000_100)
     runPerfTest(cmd,
       ReactiveKafkaConsumerFixtures.noopFixtureGen(cmd),
       ReactiveKafkaConsumerBenchmarks.consumePlainNoKafka)
@@ -85,13 +85,13 @@ class ApacheKafkaPlainConsumer extends BenchmarksBase() {
 
 class PekkoConnectorsKafkaPlainConsumer extends BenchmarksBase() {
   it should "bench" in {
-    val cmd = RunTestCommand("alpakka-kafka-plain-consumer", bootstrapServers, topic_2000_100)
+    val cmd = RunTestCommand("pekko-connectors-kafka-plain-consumer", bootstrapServers, topic_2000_100)
     runPerfTest(cmd, ReactiveKafkaConsumerFixtures.plainSources(cmd), ReactiveKafkaConsumerBenchmarks.consumePlain)
   }
 
   it should "bench with normal messages and one hundred partitions with inflight metrics" in {
     val cmd =
-      RunTestCommand("alpakka-kafka-plain-consumer-normal-msg-100-partitions-with-inflight-metrics",
+      RunTestCommand("pekko-connectors-kafka-plain-consumer-normal-msg-100-partitions-with-inflight-metrics",
         bootstrapServers,
         topic_1000_5000_100)
     val consumerMetricNames = List[ConsumerMetricRequest](
@@ -129,7 +129,7 @@ class ApacheKafkaAtMostOnceConsumer extends BenchmarksBase() {
 
 class PekkoConnectorsKafkaAtMostOnceConsumer extends BenchmarksBase() {
   it should "bench" in {
-    val cmd = RunTestCommand("alpakka-kafka-at-most-once-consumer", bootstrapServers, topic_50_100)
+    val cmd = RunTestCommand("pekko-connectors-kafka-at-most-once-consumer", bootstrapServers, topic_50_100)
     runPerfTest(cmd,
       ReactiveKafkaConsumerFixtures.committableSources(cmd),
       ReactiveKafkaConsumerBenchmarks.consumeCommitAtMostOnce)
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala
index d7e66849..ecb8d464 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala
@@ -39,7 +39,7 @@ class RawKafkaCommitEveryPollConsumer extends BenchmarksBase() {
 }
 
 class PekkoConnectorsCommitAndForgetConsumer extends BenchmarksBase() {
-  val prefix = "alpakka-kafka-commit-and-forget-"
+  val prefix = "pekko-connectors-kafka-commit-and-forget-"
 
   it should "bench with small messages" in {
     val cmd = RunTestCommand(prefix + "consumer", bootstrapServers, topic_1000_100)
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala
index 4713f41b..906f7c71 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala
@@ -14,7 +14,7 @@ import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
  */
 class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
   it should "bench composed sink with 100b messages" in {
-    val cmd = RunTestCommand("alpakka-committable-producer-composed", bootstrapServers, topic_100_100)
+    val cmd = RunTestCommand("pekko-connectors-committable-producer-composed", bootstrapServers, topic_100_100)
     runPerfTest(
       cmd,
       PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
@@ -22,7 +22,7 @@ class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
   }
 
   it should "bench composed sink with 5000b messages" in {
-    val cmd = RunTestCommand("alpakka-committable-producer-composed-5000b", bootstrapServers, topic_100_5000)
+    val cmd = RunTestCommand("pekko-connectors-committable-producer-composed-5000b", bootstrapServers, topic_100_5000)
     runPerfTest(
       cmd,
       PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
@@ -30,7 +30,7 @@ class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
   }
 
   it should "bench `Producer.committableSink` with 100b messages" in {
-    val cmd = RunTestCommand("alpakka-committable-producer", bootstrapServers, topic_100_100)
+    val cmd = RunTestCommand("pekko-connectors-committable-producer", bootstrapServers, topic_100_100)
     runPerfTest(
       cmd,
       PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
@@ -38,7 +38,7 @@ class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
   }
 
   it should "bench `Producer.committableSink` with 5000b messages" in {
-    val cmd = RunTestCommand("alpakka-committable-producer-5000b", bootstrapServers, topic_100_5000)
+    val cmd = RunTestCommand("pekko-connectors-committable-producer-5000b", bootstrapServers, topic_100_5000)
     runPerfTest(
       cmd,
       PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala
index d6441081..17b093c5 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala
@@ -40,7 +40,7 @@ class ApacheKafkaPlainProducer extends BenchmarksBase() {
 }
 
 class PekkoConnectorsKafkaPlainProducer extends BenchmarksBase() {
-  private val prefix = "alpakka-kafka-plain-producer"
+  private val prefix = "pekko-connector-kafka-plain-producer"
 
   it should "bench with small messages" in {
     val cmd = RunTestCommand(prefix, bootstrapServers, topic_2000_100.freshTopic)
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala
index a2348a84..fda70f25 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala
@@ -28,7 +28,7 @@ class ApacheKafkaTransactions extends BenchmarksBase() {
 
 class PekkoConnectorsKafkaTransactions extends BenchmarksBase() {
   it should "bench with small messages" in {
-    val cmd = RunTestCommand("alpakka-kafka-transactions", bootstrapServers, topic_100_100)
+    val cmd = RunTestCommand("pekko-connectors-kafka-transactions", bootstrapServers, topic_100_100)
     runPerfTest(
       cmd,
       ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
@@ -36,7 +36,7 @@ class PekkoConnectorsKafkaTransactions extends BenchmarksBase() {
   }
 
   it should "bench with normal messages" in {
-    val cmd = RunTestCommand("alpakka-kafka-transactions-normal-msg", bootstrapServers, topic_100_5000)
+    val cmd = RunTestCommand("pekko-connectors-kafka-transactions-normal-msg", bootstrapServers, topic_100_5000)
     runPerfTest(
       cmd,
       ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
diff --git a/benchmarks/src/main/resources/reference.conf b/benchmarks/src/main/resources/reference.conf
index 594f20a6..a158d31c 100644
--- a/benchmarks/src/main/resources/reference.conf
+++ b/benchmarks/src/main/resources/reference.conf
@@ -1,5 +1,5 @@
-akka {
-  loggers = ["akka.event.slf4j.Slf4jLogger"]
+pekko {
+  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
   loglevel = "INFO"
-  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
 }
\ No newline at end of file
diff --git a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
index 6ca00b04..999c5561 100644
--- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
+++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
@@ -151,7 +151,7 @@ private[benchmarks] trait InflightMetrics {
   }
 
   /**
-   * Return specified consumer-level metrics using Alpakka Kafka's [[Control]] metrics API.
+   * Return specified consumer-level metrics using Apache Pekko Connectors Kafka's [[Control]] metrics API.
    */
   private def consumer[T](control: Control, requests: List[ConsumerMetricRequest])(
       implicit ec: ExecutionContext): Future[List[Measurement]] = {
diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
index 149d541b..1952f181 100644
--- a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
+++ b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
@@ -33,7 +33,7 @@ import scala.compat.java8.FutureConverters._
 /**
  * API MAY CHANGE
  *
- * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka.
+ * Apache Pekko Extension to enable Apache Pekko Cluster External Sharding with Apache Pekko Connector Kafka.
  */
 @ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
 final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension {
@@ -205,13 +205,13 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
   /**
    * API MAY CHANGE
    *
-   * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
+   * Create an Apache Pekko Connector Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
    * used to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member
    * the rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy
-   * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+   * accordingly so that entities are (eventually) routed to the local Apache Pekko cluster member.
    *
-   * Returns an Akka typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
-   * passed to an Alpakka Kafka [[ConsumerSettings]].
+   * Returns an Apache Pekko typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
+   * passed to an Apache Pekko Connector Kafka [[ConsumerSettings]].
    *
    * {{{
    * import org.apache.pekko.actor.typed.scaladsl.adapter._
@@ -232,13 +232,13 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
    *
    * API MAY CHANGE
    *
-   * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
+   * Create an Apache Pekko Connector Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
    * used to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member
    * the rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy
-   * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+   * accordingly so that entities are (eventually) routed to the local Apache Pekko cluster member.
    *
-   * Returns an Akka typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
-   * passed to an Alpakka Kafka [[ConsumerSettings]].
+   * Returns an Apache Pekko typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
+   * passed to an Apache Pekko Connector Kafka [[ConsumerSettings]].
    *
    * {{{
    * import org.apache.pekko.actor.typed.scaladsl.adapter._
@@ -304,7 +304,7 @@ object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {
 
             val updates = shardAllocationClient.updateShardLocations(partitions.map { tp =>
               val shardId = tp.partition().toString
-              // the Kafka partition number becomes the akka shard id
+              // the Kafka partition number becomes the Apache Pekko shard id
               (shardId, address)
             }.toMap)
 
diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf
index 83fa291d..efd7a5e2 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -1,15 +1,15 @@
 
 # // #producer-settings
-# Properties for akka.kafka.ProducerSettings can be
+# Properties for org.apache.pekko.kafka.ProducerSettings can be
 # defined in this section or a configuration section with
 # the same layout.
-akka.kafka.producer {
-  # Config path of Akka Discovery method
-  # "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
-  discovery-method = akka.discovery
+pekko.kafka.producer {
+  # Config path of Apache Pekko Discovery method
+  # "pekko.discovery" to use the Apache Pekko Discovery method configured for the ActorSystem
+  discovery-method = pekko.discovery
 
-  # Set a service name for use with Akka Discovery
-  # https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
+  # Set a service name for use with Apache Pekko Discovery
+  # https://pekko.apache.org/docs/pekko-connectors-kafka/current/discovery.html
   service-name = ""
 
   # Timeout for getting a reply from the discovery-method lookup
@@ -30,7 +30,7 @@ akka.kafka.producer {
   # to be used by the producer stages. Some blocking may occur.
   # When this value is empty, the dispatcher configured for the stream
   # will be used.
-  use-dispatcher = "akka.kafka.default-dispatcher"
+  use-dispatcher = "pekko.kafka.default-dispatcher"
 
   # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
   # for exactly-once-semantics processing.
@@ -44,16 +44,16 @@ akka.kafka.producer {
 # // #producer-settings
 
 # // #consumer-settings
-# Properties for akka.kafka.ConsumerSettings can be
+# Properties for pekko.kafka.ConsumerSettings can be
 # defined in this section or a configuration section with
 # the same layout.
-akka.kafka.consumer {
-  # Config path of Akka Discovery method
-  # "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
-  discovery-method = akka.discovery
+pekko.kafka.consumer {
+  # Config path of Apache Pekko Discovery method
+  # "pekko.discovery" to use the Apache Pekko Discovery method configured for the ActorSystem
+  discovery-method = pekko.discovery
 
-  # Set a service name for use with Akka Discovery
-  # https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
+  # Set a service name for use with Apache Pekko Discovery
+  # https://pekko.apache.org/docs/pekko-connectors-kafka/current/discovery.html
   service-name = ""
 
   # Timeout for getting a reply from the discovery-method lookup
@@ -92,7 +92,7 @@ akka.kafka.consumer {
 
   # Fully qualified config path which holds the dispatcher configuration
   # to be used by the KafkaConsumerActor. Some blocking may occur.
-  use-dispatcher = "akka.kafka.default-dispatcher"
+  use-dispatcher = "pekko.kafka.default-dispatcher"
 
   # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
   # can be defined in this configuration section.
@@ -111,7 +111,7 @@ akka.kafka.consumer {
   # call to Kafka's API
   offset-for-times-timeout = 5s
 
-  # Timeout for akka.kafka.Metadata requests
+  # Timeout for org.apache.pekko.kafka.Metadata requests
   # This value is used instead of Kafka's default from `default.api.timeout.ms`
   # which is 1 minute.
   metadata-request-timeout = 5s
@@ -147,7 +147,7 @@ akka.kafka.consumer {
   # then causes the Kafka consumer to follow its normal 'auto.offset.reset' behavior. For 'earliest', these settings
   # allow the client to detect and attempt to recover from this issue. For 'none' and 'latest', these settings will
   # only add overhead. See
-  # https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#unexpected-consumer-offset-reset
+  # https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#unexpected-consumer-offset-reset
   # for more information
   offset-reset-protection {
     # turns on reset protection
@@ -162,10 +162,10 @@ akka.kafka.consumer {
 # // #consumer-settings
 
 # // #committer-settings
-# Properties for akka.kafka.CommitterSettings can be
+# Properties for org.apache.pekko.kafka.CommitterSettings can be
 # defined in this section or a configuration section with
 # the same layout.
-akka.kafka.committer {
+pekko.kafka.committer {
 
   # Maximum number of messages in a single commit batch
   max-batch = 1000
@@ -192,7 +192,7 @@ akka.kafka.committer {
 
 # The dispatcher that will be used by default by consumer and
 # producer stages.
-akka.kafka.default-dispatcher {
+pekko.kafka.default-dispatcher {
   type = "Dispatcher"
   executor = "thread-pool-executor"
 
diff --git a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
index 00e2e3e0..4b8ecaa2 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
@@ -98,27 +98,27 @@ object CommitWhen {
 
 object CommitterSettings {
 
-  val configPath = "akka.kafka.committer"
+  val configPath = "pekko.kafka.committer"
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.committer`.
+   * `pekko.kafka.committer`.
    */
   def apply(actorSystem: org.apache.pekko.actor.ActorSystem): CommitterSettings =
     apply(actorSystem.settings.config.getConfig(configPath))
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.committer`.
+   * `pekko.kafka.committer`.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def apply(actorSystem: org.apache.pekko.actor.ClassicActorSystemProvider): CommitterSettings =
     apply(actorSystem.classicSystem.settings.config.getConfig(configPath))
 
   /**
    * Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.committer`.
+   * the default configuration `pekko.kafka.committer`.
    */
   def apply(config: Config): CommitterSettings = {
     val maxBatch = config.getLong("max-batch")
@@ -131,23 +131,23 @@ object CommitterSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.committer`.
+   * `pekko.kafka.committer`.
    */
   def create(actorSystem: org.apache.pekko.actor.ActorSystem): CommitterSettings =
     apply(actorSystem)
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.committer`.
+   * `pekko.kafka.committer`.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def create(actorSystem: org.apache.pekko.actor.ClassicActorSystemProvider): CommitterSettings =
     apply(actorSystem)
 
   /**
    * Java API: Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.committer`.
+   * the default configuration `pekko.kafka.committer`.
    */
   def create(config: Config): CommitterSettings =
     apply(config)
@@ -155,7 +155,7 @@ object CommitterSettings {
 }
 
 /**
- * Settings for committer. See `akka.kafka.committer` section in
+ * Settings for committer. See `pekko.kafka.committer` section in
  * reference.conf. Note that the [[org.apache.pekko.kafka.CommitterSettings$ companion]] object provides
  * `apply` and `create` functions for convenient construction of the settings, together with
  * the `with` methods.
@@ -195,7 +195,7 @@ class CommitterSettings private (
     new CommitterSettings(maxBatch, maxInterval, parallelism, delivery, when)
 
   override def toString: String =
-    "akka.kafka.CommitterSettings(" +
+    "org.apache.pekko.kafka.CommitterSettings(" +
     s"maxBatch=$maxBatch," +
     s"maxInterval=${maxInterval.toCoarsest}," +
     s"parallelism=$parallelism," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
index 2745ab51..77f8da3f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
@@ -42,7 +42,7 @@ class ConnectionCheckerSettings private[kafka] (val enable: Boolean,
     copy(checkInterval = checkInterval.asScala)
 
   override def toString: String =
-    s"akka.kafka.ConnectionCheckerSettings(" +
+    s"org.apache.pekko.kafka.ConnectionCheckerSettings(" +
     s"enable=$enable," +
     s"maxRetries=$maxRetries," +
     s"checkInterval=${checkInterval.toCoarsest}," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index 113aed60..18eac4fd 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -23,11 +23,11 @@ import scala.concurrent.duration._
 
 object ConsumerSettings {
 
-  val configPath = "akka.kafka.consumer"
+  val configPath = "pekko.kafka.consumer"
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key or value deserializer can be passed explicitly or retrieved from configuration.
    */
   def apply[K, V](
@@ -40,10 +40,10 @@ object ConsumerSettings {
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key or value deserializer can be passed explicitly or retrieved from configuration.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def apply[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -53,7 +53,7 @@ object ConsumerSettings {
 
   /**
    * Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.consumer`.
+   * the default configuration `pekko.kafka.consumer`.
    * Key or value deserializer can be passed explicitly or retrieved from configuration.
    */
   def apply[K, V](
@@ -113,7 +113,7 @@ object ConsumerSettings {
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key and value serializer must be passed explicitly.
    */
   def apply[K, V](
@@ -124,10 +124,10 @@ object ConsumerSettings {
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key and value serializer must be passed explicitly.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def apply[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -137,7 +137,7 @@ object ConsumerSettings {
 
   /**
    * Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.consumer`.
+   * the default configuration `pekko.kafka.consumer`.
    * Key and value serializer must be passed explicitly.
    */
   def apply[K, V](
@@ -148,7 +148,7 @@ object ConsumerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key or value deserializer can be passed explicitly or retrieved from configuration.
    */
   def create[K, V](
@@ -159,10 +159,10 @@ object ConsumerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key or value deserializer can be passed explicitly or retrieved from configuration.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def create[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -172,7 +172,7 @@ object ConsumerSettings {
 
   /**
    * Java API: Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.consumer`.
+   * the default configuration `pekko.kafka.consumer`.
    * Key or value deserializer can be passed explicitly or retrieved from configuration.
    */
   def create[K, V](
@@ -183,7 +183,7 @@ object ConsumerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key and value serializer must be passed explicitly.
    */
   def create[K, V](
@@ -194,10 +194,10 @@ object ConsumerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.consumer`.
+   * `pekko.kafka.consumer`.
    * Key and value serializer must be passed explicitly.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def create[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -207,7 +207,7 @@ object ConsumerSettings {
 
   /**
    * Java API: Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.consumer`.
+   * the default configuration `pekko.kafka.consumer`.
    * Key and value serializer must be passed explicitly.
    */
   def create[K, V](
@@ -227,7 +227,7 @@ object ConsumerSettings {
 }
 
 /**
- * Settings for consumers. See `akka.kafka.consumer` section in
+ * Settings for consumers. See `pekko.kafka.consumer` section in
  * `reference.conf`. Note that the [[org.apache.pekko.kafka.ConsumerSettings$ companion]] object provides
  * `apply` and `create` functions for convenient construction of the settings, together with
  * the `with` methods.
@@ -623,7 +623,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
       }
       .sortBy(_._1)
       .mkString(",")
-    "akka.kafka.ConsumerSettings(" +
+    "org.apache.pekko.kafka.ConsumerSettings(" +
     s"properties=$kafkaClients," +
     s"keyDeserializer=$keyDeserializerOpt," +
     s"valueDeserializer=$valueDeserializerOpt," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
index 1cba0cd4..dcd70420 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
@@ -53,7 +53,7 @@ class OffsetResetProtectionSettings @InternalApi private[kafka] (val enable: Boo
     copy(timeThreshold = timeThreshold.asScala)
 
   override def toString: String =
-    s"akka.kafka.OffsetResetProtectionSettings(" +
+    s"org.apache.pekko.kafka.OffsetResetProtectionSettings(" +
     s"enable=$enable," +
     s"offsetThreshold=$offsetThreshold," +
     s"timeThreshold=${timeThreshold.toCoarsest}" +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
index de49a5b6..f447d336 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
@@ -24,11 +24,11 @@ import scala.compat.java8.FutureConverters._
 
 object ProducerSettings {
 
-  val configPath = "akka.kafka.producer"
+  val configPath = "pekko.kafka.producer"
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key or value serializer can be passed explicitly or retrieved from configuration.
    */
   def apply[K, V](
@@ -39,10 +39,10 @@ object ProducerSettings {
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key or value serializer can be passed explicitly or retrieved from configuration.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def apply[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -52,7 +52,7 @@ object ProducerSettings {
 
   /**
    * Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.producer`.
+   * the default configuration `pekko.kafka.producer`.
    * Key or value serializer can be passed explicitly or retrieved from configuration.
    */
   def apply[K, V](
@@ -88,7 +88,7 @@ object ProducerSettings {
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key and value serializer must be passed explicitly.
    */
   def apply[K, V](
@@ -99,10 +99,10 @@ object ProducerSettings {
 
   /**
    * Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key and value serializer must be passed explicitly.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def apply[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -112,7 +112,7 @@ object ProducerSettings {
 
   /**
    * Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.producer`.
+   * the default configuration `pekko.kafka.producer`.
    * Key and value serializer must be passed explicitly.
    */
   def apply[K, V](
@@ -123,7 +123,7 @@ object ProducerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key or value serializer can be passed explicitly or retrieved from configuration.
    */
   def create[K, V](
@@ -134,10 +134,10 @@ object ProducerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key or value serializer can be passed explicitly or retrieved from configuration.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def create[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -147,7 +147,7 @@ object ProducerSettings {
 
   /**
    * Java API: Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.producer`.
+   * the default configuration `pekko.kafka.producer`.
    * Key or value serializer can be passed explicitly or retrieved from configuration.
    */
   def create[K, V](
@@ -158,7 +158,7 @@ object ProducerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key and value serializer must be passed explicitly.
    */
   def create[K, V](
@@ -169,10 +169,10 @@ object ProducerSettings {
 
   /**
    * Java API: Create settings from the default configuration
-   * `akka.kafka.producer`.
+   * `pekko.kafka.producer`.
    * Key and value serializer must be passed explicitly.
    *
-   * For use with the `akka.actor.typed` API.
+   * For use with the `pekko.actor.typed` API.
    */
   def create[K, V](
       system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -182,7 +182,7 @@ object ProducerSettings {
 
   /**
    * Java API: Create settings from a configuration with the same layout as
-   * the default configuration `akka.kafka.producer`.
+   * the default configuration `pekko.kafka.producer`.
    * Key and value serializer must be passed explicitly.
    */
   def create[K, V](
@@ -201,7 +201,7 @@ object ProducerSettings {
 }
 
 /**
- * Settings for producers. See `akka.kafka.producer` section in
+ * Settings for producers. See `pekko.kafka.producer` section in
  * reference.conf. Note that the [[org.apache.pekko.kafka.ProducerSettings$ companion]] object provides
  * `apply` and `create` functions for convenient construction of the settings, together with
  * the `with` methods.
@@ -392,7 +392,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
       }
       .sortBy(_._1)
       .mkString(",")
-    "akka.kafka.ProducerSettings(" +
+    "org.apache.pekko.kafka.ProducerSettings(" +
     s"properties=$kafkaClients," +
     s"keySerializer=$keySerializerOpt," +
     s"valueSerializer=$valueSerializerOpt," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
index 2cc5dd6b..8048c67e 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
@@ -109,13 +109,13 @@ object ConsumerResetProtection {
           log.warning(
             s"Your last commit request $previouslyCommitted is more than the configured threshold from the last" +
             s"committed offset ($committed) for $tp. See " +
-            "https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#setting-offset-threshold-appropriately for more info.")
+            "https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#setting-offset-threshold-appropriately for more info.")
         }
         log.warning(
           s"Dropping offsets for partition $tp - received an offset which is less than allowed $threshold " +
           s"from the  last requested offset (threshold: $threshold). Seeking to the latest known safe (committed " +
           s"or assigned) offset: $committed. See  " +
-          "https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#unexpected-consumer-offset-reset" +
+          "https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#unexpected-consumer-offset-reset" +
           "for more information.")
         consumer ! Seek(Map(tp -> committed.offset()))
         None
diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
index 761a3bf9..cc7b7ec2 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
@@ -18,7 +18,7 @@ private[internal] trait InstanceId {
 }
 
 /**
- * Override akka streams [[StageLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
+ * Override Apache Pekko streams [[StageLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
  */
 private[internal] trait StageIdLogging extends StageLogging with InstanceId { self: GraphStageLogic =>
   private[this] var _log: LoggingAdapter = _
@@ -32,7 +32,7 @@ private[internal] trait StageIdLogging extends StageLogging with InstanceId { se
 }
 
 /**
- * Override akka classic [[ActorLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
+ * Override Apache Pekko classic [[ActorLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
  */
 private[internal] trait ActorIdLogging extends ActorLogging with InstanceId { this: Actor =>
   private[this] var _log: LoggingAdapter = _
diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala
index 8437662c..346e0cf6 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala
@@ -13,7 +13,7 @@ import org.apache.kafka.common.TopicPartition
  * A buffer of messages provided by the [[KafkaConsumerActor]] for a Source Logic. When partitions are rebalanced
  * away from this Source Logic preemptively filter out messages for those partitions.
  *
- * NOTE: Due to the asynchronous nature of Akka Streams, it's not possible to guarantee that a message has not
+ * NOTE: Due to the asynchronous nature of Apache Pekko Streams, it's not possible to guarantee that a message has not
  * already been sent downstream for a revoked partition before the rebalance handler invokes
  * `filterRevokedPartitionsCB`. The best we can do is filter as many messages as possible to reduce the amount of
  * duplicate messages sent downstream.
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
index 874b40c9..80be91b1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
@@ -24,7 +24,7 @@ import scala.compat.java8.FutureConverters._
 import scala.concurrent.duration.FiniteDuration
 
 /**
- * Akka Stream connector for subscribing to Kafka topics.
+ * Apache Pekko Stream connector for subscribing to Kafka topics.
  */
 object Consumer {
 
@@ -168,7 +168,7 @@ object Consumer {
    * This is useful when "at-least once delivery" is desired, as each message will likely be
    * delivered one time but in failure cases could be duplicated.
    *
-   * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
    * and [[Producer.flowWithContext]].
    */
   @ApiMayChange
@@ -192,7 +192,7 @@ object Consumer {
    * This is useful when "at-least once delivery" is desired, as each message will likely be
    * delivered one time but in failure cases could be duplicated.
    *
-   * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
    * and [[Producer.flowWithContext]].
    *
    * This variant makes it possible to add additional metadata (in the form of a string)
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
index 522ed462..dba527ca 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
@@ -18,7 +18,7 @@ import scala.annotation.nowarn
 import scala.compat.java8.FutureConverters._
 
 /**
- * Akka Stream connector for publishing messages to Kafka topics.
+ * Apache Pekko Stream connector for publishing messages to Kafka topics.
  */
 object Producer {
 
@@ -210,7 +210,7 @@ object Producer {
    *
    * - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
    *
-   * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+   * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
    *
    * @tparam C the flow context type
    */
@@ -278,7 +278,7 @@ object Producer {
    *
    * - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
    *
-   * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+   * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
    *
    * Supports sharing a Kafka Producer instance.
    *
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
index 61696610..75f2a13f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
@@ -16,7 +16,7 @@ import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
 import scala.compat.java8.FutureConverters._
 
 /**
- * Utility class for producing to Kafka without using Akka Streams.
+ * Utility class for producing to Kafka without using Apache Pekko Streams.
  */
 final class SendProducer[K, V] private (underlying: scaladsl.SendProducer[K, V]) {
 
@@ -26,7 +26,7 @@ final class SendProducer[K, V] private (underlying: scaladsl.SendProducer[K, V])
     this(scaladsl.SendProducer(settings)(system))
 
   /**
-   * Utility class for producing to Kafka without using Akka Streams.
+   * Utility class for producing to Kafka without using Apache Pekko Streams.
    * @param settings producer settings used to create or access the [[org.apache.kafka.clients.producer.Producer]]
    *
    * The internal asynchronous operations run on the provided `Executor` (which may be an `ActorSystem`'s dispatcher).
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
index 1d13ff39..9b651911 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
 import scala.compat.java8.FutureConverters.FutureOps
 
 /**
- *  Akka Stream connector to support transactions between Kafka topics.
+ *  Apache Pekko Stream connector to support transactions between Kafka topics.
  */
 object Transactional {
 
@@ -39,7 +39,7 @@ object Transactional {
   /**
    * API MAY CHANGE
    *
-   * This source is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+   * This source is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
    * and [[Transactional.flowWithOffsetContext]].
    */
   @ApiMayChange
@@ -125,7 +125,7 @@ object Transactional {
    * carries [[ConsumerMessage.PartitionOffset]] as context.  The flow requires a unique `transactional.id` across all app
    * instances. The flow will override producer properties to enable Kafka exactly-once transactional support.
    *
-   * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+   * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
    * and [[Transactional.sourceWithOffsetContext]].
    */
   @ApiMayChange
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
index 5599f9d4..2a32705d 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
@@ -20,14 +20,14 @@ import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ ExecutionContext, Future }
 
 /**
- * Akka Stream connector for subscribing to Kafka topics.
+ * Apache Pekko Stream connector for subscribing to Kafka topics.
  */
 object Consumer {
 
   /**
    * Materialized value of the consumer `Source`.
    *
-   * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+   * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
    */
   trait Control {
 
@@ -37,7 +37,7 @@ object Consumer {
      * already enqueued messages. It does not unsubscribe from any topics/partitions
      * as that could trigger a consumer group rebalance.
      *
-     * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+     * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
      *
      * Call [[#shutdown]] to close consumer.
      */
@@ -46,11 +46,11 @@ object Consumer {
     /**
      * Shut down the consumer `Source`.
      *
-     * The actor backing the source will stay alive for `akka.kafka.consumer.stop-timeout` so that more commits
+     * The actor backing the source will stay alive for `pekko.kafka.consumer.stop-timeout` so that more commits
      * from enqueued messages can be handled.
      * The actor will wait for acknowledgements of the already sent offset commits from the Kafka broker before shutting down.
      *
-     * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+     * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
      */
     def shutdown(): Future[Done]
 
@@ -94,7 +94,7 @@ object Consumer {
    * one, so that the stream can be stopped in a controlled way without losing
    * commits.
    *
-   * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+   * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
    */
   final class DrainingControl[T] private (control: Control, val streamCompletion: Future[T]) extends Control {
 
@@ -199,7 +199,7 @@ object Consumer {
    * This is useful when "at-least once delivery" is desired, as each message will likely be
    * delivered one time but in failure cases could be duplicated.
    *
-   * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html),
+   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
    * [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
    */
   @ApiMayChange
@@ -219,7 +219,7 @@ object Consumer {
    * This is useful when "at-least once delivery" is desired, as each message will likely be
    * delivered one time but in failure cases could be duplicated.
    *
-   * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html),
+   * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
    * [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
    *
    * This variant makes it possible to add additional metadata (in the form of a string)
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
index d98030b7..fe027e2a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
@@ -26,7 +26,7 @@ object DiscoverySupport {
   // used for initial discovery of contact points
   private def discovery(config: Config, system: ActorSystem): ServiceDiscovery =
     config.getString("discovery-method") match {
-      case "akka.discovery" =>
+      case "pekko.discovery" =>
         Discovery(system).discovery
 
       case otherDiscoveryMechanism =>
@@ -34,7 +34,7 @@ object DiscoverySupport {
     }
 
   /**
-   * Use Akka Discovery to read the addresses for `serviceName` within `lookupTimeout`.
+   * Use Apache Pekko Discovery to read the addresses for `serviceName` within `lookupTimeout`.
    */
   private def bootstrapServers(
       discovery: ServiceDiscovery,
@@ -55,7 +55,7 @@ object DiscoverySupport {
   /**
    * Internal API.
    *
-   * Expect a `service` section in Config and use Akka Discovery to read the addresses for `name` within `lookup-timeout`.
+   * Expect a `service` section in Config and use Apache Pekko Discovery to read the addresses for `name` within `lookup-timeout`.
    */
   @InternalApi
   private[kafka] def bootstrapServers(config: Config)(implicit system: ActorSystem): Future[String] = {
@@ -117,7 +117,7 @@ object DiscoverySupport {
     system.dynamicAccess.getClassFor("org.apache.pekko.discovery.Discovery$") match {
       case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
         throw new IllegalStateException(
-          s"Pekko Discovery is being used but the `pekko-discovery` library is not on the classpath, it must be added explicitly. See https://pekko.apache.org/docs/pekko/current/discovery/index.html")
+          s"Apache Pekko Discovery is being used but the `pekko-discovery` library is not on the classpath, it must be added explicitly. See https://pekko.apache.org/docs/pekko/current/discovery/index.html")
       case _ =>
     }
 
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala
index 3a6f2fda..a5161bea 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala
@@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
 import scala.concurrent.Future
 
 /**
- * Akka Stream connector for publishing messages to Kafka topics.
+ * Apache Pekko Stream connector for publishing messages to Kafka topics.
  */
 object Producer {
 
@@ -203,7 +203,7 @@ object Producer {
    *
    * - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
    *
-   * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+   * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
    *
    * @tparam C the flow context type
    */
@@ -272,7 +272,7 @@ object Producer {
    *
    * - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
    *
-   * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+   * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
    *
    * Supports sharing a Kafka Producer instance.
    *
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
index 58cd7aba..48948507 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
@@ -15,7 +15,7 @@ import org.apache.kafka.clients.producer.{ Callback, ProducerRecord, RecordMetad
 import scala.concurrent.{ ExecutionContext, Future, Promise }
 
 /**
- * Utility class for producing to Kafka without using Akka Streams.
+ * Utility class for producing to Kafka without using Apache Pekko Streams.
  * @param settings producer settings used to create or access the [[org.apache.kafka.clients.producer.Producer]]
  */
 final class SendProducer[K, V] private (val settings: ProducerSettings[K, V], system: ActorSystem) {
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
index 526d519f..af864ee2 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition
 import scala.concurrent.Future
 
 /**
- * Akka Stream connector to support transactions between Kafka topics.
+ * Apache Pekko Stream connector to support transactions between Kafka topics.
  */
 object Transactional {
 
@@ -40,7 +40,7 @@ object Transactional {
   /**
    * API MAY CHANGE
    *
-   * This source is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+   * This source is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
    * and [[Transactional.flowWithOffsetContext]].
    */
   @ApiMayChange
@@ -122,7 +122,7 @@ object Transactional {
    * carries [[ConsumerMessage.PartitionOffset]] as context. The flow requires a unique `transactional.id` across all app
    * instances. The flow will override producer properties to enable Kafka exactly-once transactional support.
    *
-   * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+   * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
    * and [[Transactional.sourceWithOffsetContext]].
    */
   @ApiMayChange
diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala
index aecf6f47..20a08996 100644
--- a/project/ProjectSettings.scala
+++ b/project/ProjectSettings.scala
@@ -15,7 +15,7 @@ object ProjectSettings {
     |
     |The build has three main modules:
     |  core - the Kafka connector sources
-    |  cluster-sharding - Akka Cluster External Sharding with the Apache Pekko Kafka Connector
+    |  cluster-sharding - Apache Pekko Cluster External Sharding with the Apache Pekko Kafka Connector
     |  tests - tests, Docker based integration tests, code for the documentation
     |  testkit - framework for testing the connector
     |
@@ -64,7 +64,7 @@ object ProjectSettings {
       url("https://github.com/apache/incubator-pekko-connectors-kafka/graphs/contributors")),
     startYear := Some(2022),
     licenses := Seq("Apache-2.0" -> url("https://opensource.org/licenses/Apache-2.0")),
-    description := "Apache Pekko Kafka Connector is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Pekko.",
+    description := "Apache Pekko Kafka Connector is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Apache Pekko.",
     crossScalaVersions := Seq(Scala213),
     scalaVersion := Scala213,
     crossVersion := CrossVersion.binary,
@@ -87,7 +87,7 @@ object ProjectSettings {
       "-sourcepath",
       (ThisBuild / baseDirectory).value.toString,
       "-skip-packages",
-      "akka.pattern:scala", // for some reason Scaladoc creates this
+      "pekko.pattern:scala", // for some reason Scaladoc creates this
       "-doc-source-url", {
         val branch = if (isSnapshot.value) "master" else s"v${version.value}"
         s"https://github.com/apache/incubator-pekko-connectors-kafka/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
diff --git a/project/VersionGenerator.scala b/project/VersionGenerator.scala
index c8abf39e..104302a2 100644
--- a/project/VersionGenerator.scala
+++ b/project/VersionGenerator.scala
@@ -2,19 +2,19 @@ import sbt._
 import sbt.Keys._
 
 /**
- * Generate version.conf and akka/kafka/Version.scala files based on the version setting.
+ * Generate version.conf and org/apache/pekko/kafka/Version.scala files based on the version setting.
  *
- * This was adapted from https://github.com/akka/akka/blob/v2.6.8/project/VersionGenerator.scala
+ * This was adapted from https://github.com/apache/incubator-pekko/blob/main/project/VersionGenerator.scala
  */
 object VersionGenerator {
 
   val settings: Seq[Setting[_]] = inConfig(Compile)(
     Seq(
-      resourceGenerators += generateVersion(resourceManaged, _ / "version.conf", """|akka.kafka.version = "%s"
+      resourceGenerators += generateVersion(resourceManaged, _ / "version.conf", """|pekko.kafka.version = "%s"
          |"""),
       sourceGenerators += generateVersion(
         sourceManaged,
-        _ / "akka" / "kafka" / "Version.scala",
+        _ / "org" / "apache" / "pekko" / "kafka" / "Version.scala",
         """|package org.apache.pekko.kafka
          |
          |object Version {
diff --git a/project/Versions.scala b/project/Versions.scala
index ba78eccb..c380ed4d 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -16,7 +16,7 @@ object Versions {
   // Keep .scala-steward.conf pin in sync
   val kafkaVersion = "3.0.1"
   val KafkaVersionForDocs = "30"
-  // This should align with the ScalaTest version used in the Akka 2.6.x testkit
+  // This should align with the ScalaTest version used in the Apache Pekko 1.0.x testkit
   // https://github.com/apache/incubator-pekko/blob/main/project/Dependencies.scala#L70
   val scalaTestVersion = "3.1.4"
   val testcontainersVersion = "1.16.3"
diff --git a/project/project-info.conf b/project/project-info.conf
index 5b14426b..5ef8ad9a 100644
--- a/project/project-info.conf
+++ b/project/project-info.conf
@@ -12,7 +12,7 @@ project-info {
       text: "Github issues"
     }
     release-notes: {
-      url: "https://doc.akka.io/docs/alpakka-kafka/current/release-notes/index.html"
+      url: "https://pekko.apache.org/docs/pekko-connectors-kafka/current/release-notes/index.html"
       text: "In the documentation"
       new-tab: false
     }
diff --git a/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java b/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java
index 2763a122..0867b297 100644
--- a/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java
+++ b/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java
@@ -43,7 +43,7 @@ public class KafkaContainerCluster implements Startable {
   public static final Duration DEFAULT_CLUSTER_START_TIMEOUT = Duration.ofSeconds(360);
   public static final Duration DEFAULT_READINESS_CHECK_TIMEOUT = DEFAULT_CLUSTER_START_TIMEOUT;
 
-  private static final String LOGGING_NAMESPACE_PREFIX = "akka.kafka.testkit.testcontainers.logs";
+  private static final String LOGGING_NAMESPACE_PREFIX = "pekko.kafka.testkit.testcontainers.logs";
   private static final String READINESS_CHECK_SCRIPT = "/testcontainers_readiness_check.sh";
   private static final String READINESS_CHECK_TOPIC = "ready-kafka-container-cluster";
   private static final Version BOOTSTRAP_PARAM_MIN_VERSION = new Version("5.2.0");
diff --git a/testkit/src/main/resources/reference.conf b/testkit/src/main/resources/reference.conf
index 5142a855..988945e2 100644
--- a/testkit/src/main/resources/reference.conf
+++ b/testkit/src/main/resources/reference.conf
@@ -1,5 +1,5 @@
 # // #testkit-settings
-akka.kafka.testkit {
+pekko.kafka.testkit {
 
   # amount of time to wait until the desired cluster state is reached
   cluster-timeout = 10 seconds
@@ -13,18 +13,18 @@ akka.kafka.testkit {
 # // #testkit-settings
 
 # // #testkit-testcontainers-settings
-akka.kafka.testkit.testcontainers {
+pekko.kafka.testkit.testcontainers {
 
   # define these settings to select a different Kafka/ZooKeeper docker image
   # we recommend using Confluent Platform docker images and using the same version across all images
   # Confluent publishes images on DockerHub: https://hub.docker.com/r/confluentinc/cp-kafka/tags
   # Kafka versions in Confluent Platform: https://docs.confluent.io/current/installation/versions-interoperability.html
   zookeeper-image = "confluentinc/cp-zookeeper"
-  zookeeper-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
+  zookeeper-image-tag = ${pekko.kafka.testkit.testcontainers.confluent-platform-version}
   kafka-image = "confluentinc/cp-kafka"
-  kafka-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
+  kafka-image-tag = ${pekko.kafka.testkit.testcontainers.confluent-platform-version}
   schema-registry-image = "confluentinc/cp-schema-registry"
-  schema-registry-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
+  schema-registry-image-tag = ${pekko.kafka.testkit.testcontainers.confluent-platform-version}
   # See https://docs.confluent.io/platform/current/installation/versions-interoperability.html
   confluent-platform-version = "7.0.0"
 
diff --git a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala
index bdf541e1..e10d6b7e 100644
--- a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala
+++ b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala
@@ -31,7 +31,7 @@ class KafkaTestkitSettings private (val clusterTimeout: FiniteDuration,
 }
 
 object KafkaTestkitSettings {
-  final val ConfigPath = "akka.kafka.testkit"
+  final val ConfigPath = "pekko.kafka.testkit"
 
   /**
    * Create testkit settings from ActorSystem settings.
diff --git a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
index f9394f97..a7786dd9 100644
--- a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
+++ b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
@@ -283,7 +283,7 @@ final class KafkaTestkitTestcontainersSettings private (
 }
 
 object KafkaTestkitTestcontainersSettings {
-  final val ConfigPath = "akka.kafka.testkit.testcontainers"
+  final val ConfigPath = "pekko.kafka.testkit.testcontainers"
 
   /**
    * Create testkit testcontainers settings from ActorSystem settings.
diff --git a/tests/src/it/resources/reference.conf b/tests/src/it/resources/reference.conf
index 13c73da6..da2d68f5 100644
--- a/tests/src/it/resources/reference.conf
+++ b/tests/src/it/resources/reference.conf
@@ -1,7 +1,7 @@
-akka {
-  loggers = ["akka.event.slf4j.Slf4jLogger"]
+pekko {
+  loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
   loglevel = "DEBUG"
-  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+  logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
   logger-startup-timeout = 15s
 
   actor {
@@ -14,7 +14,7 @@ akka {
   test {
     # https://github.com/akka/alpakka-kafka/pull/994
     timefactor = 3.0
-    timefactor = ${?AKKA_TEST_TIMEFACTOR}
+    timefactor = ${?PEKKO_TEST_TIMEFACTOR}
     single-expect-default = 10s
   }
 
diff --git a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
index b2f07c30..2a7ad93f 100644
--- a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
+++ b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
@@ -86,7 +86,7 @@ class ConsumerExampleTest extends TestcontainersKafkaTest {
   }
 
   // #settings
-  final Config config = system.settings().config().getConfig("akka.kafka.consumer");
+  final Config config = system.settings().config().getConfig("pekko.kafka.consumer");
   final ConsumerSettings<String, byte[]> consumerSettings =
       ConsumerSettings.create(config, new StringDeserializer(), new ByteArrayDeserializer())
           .withBootstrapServers("localhost:9092")
diff --git a/tests/src/test/java/docs/javadsl/ProducerTest.java b/tests/src/test/java/docs/javadsl/ProducerTest.java
index 1eada873..ae3094ad 100644
--- a/tests/src/test/java/docs/javadsl/ProducerTest.java
+++ b/tests/src/test/java/docs/javadsl/ProducerTest.java
@@ -68,7 +68,7 @@ class ProducerTest extends TestcontainersKafkaTest {
   void createProducer() {
     // #producer
     // #settings
-    final Config config = system.settings().config().getConfig("akka.kafka.producer");
+    final Config config = system.settings().config().getConfig("pekko.kafka.producer");
     final ProducerSettings<String, String> producerSettings =
         ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
             .withBootstrapServers("localhost:9092");
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf
index 7bfceefc..55132734 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf
@@ -11,12 +11,12 @@ pekko {
   test {
     # https://github.com/akka/alpakka-kafka/pull/994
     timefactor = 3.0
-    timefactor = ${?AKKA_TEST_TIMEFACTOR}
+    timefactor = ${?PEKKO_TEST_TIMEFACTOR}
     single-expect-default = 10s
   }
 }
 
-akka {
+pekko {
   kafka {
     consumer {
       stop-timeout = 10ms
@@ -30,10 +30,10 @@ akka {
 }
 
 # default is 10 seconds
-# akka.kafka.testkit.consumer-group-timeout = 20 seconds
+# pekko.kafka.testkit.consumer-group-timeout = 20 seconds
 
 # #consumer-config-inheritance
-our-kafka-consumer: ${akka.kafka.consumer} {
+our-kafka-consumer: ${pekko.kafka.consumer} {
   kafka-clients {
     bootstrap.servers = "kafka-host:9092"
   }
diff --git a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
index 1a0ed5db..46b01f5c 100644
--- a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
@@ -53,7 +53,7 @@ object ClusterShardingExample {
   // #message-extractor
 
   // #setup-cluster-sharding
-  // create an Akka Cluster Sharding `EntityTypeKey` for `User` for this Kafka Consumer Group
+  // create an Apache Pekko Cluster Sharding `EntityTypeKey` for `User` for this Kafka Consumer Group
   val groupId = "user-topic-group-id"
   val typeKey = EntityTypeKey[User](groupId)
 
@@ -69,11 +69,11 @@ object ClusterShardingExample {
   // #setup-cluster-sharding
 
   // #rebalance-listener
-  // obtain an Akka classic ActorRef that will handle consumer group rebalance events
+  // obtain an Apache Pekko classic ActorRef that will handle consumer group rebalance events
   val rebalanceListener: org.apache.pekko.actor.typed.ActorRef[ConsumerRebalanceEvent] =
     KafkaClusterSharding(system.toClassic).rebalanceListener(typeKey)
 
-  // convert the rebalance listener to a classic ActorRef until Alpakka Kafka supports Akka Typed
+  // convert the rebalance listener to a classic ActorRef until Apache Pekko Connector Kafka supports Apache Pekko Typed
   import org.apache.pekko.actor.typed.scaladsl.adapter._
   val rebalanceListenerClassic: org.apache.pekko.actor.ActorRef = rebalanceListener.toClassic
 
diff --git a/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala b/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
index a22abd85..1c816eb8 100644
--- a/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
@@ -84,7 +84,7 @@ class ConsumerExample extends DocsSpecBase with TestcontainersKafkaLike {
 
   def createSettings(): ConsumerSettings[String, Array[Byte]] = {
     // #settings
-    val config = system.settings.config.getConfig("akka.kafka.consumer")
+    val config = system.settings.config.getConfig("pekko.kafka.consumer")
     val consumerSettings =
       ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
         .withBootstrapServers(bootstrapServers)
diff --git a/tests/src/test/scala/docs/scaladsl/ProducerExample.scala b/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
index 5899fbbc..3bfb5f38 100644
--- a/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
@@ -27,7 +27,7 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
   "Creating a producer" should "work" in {
     // #producer
     // #settings
-    val config = system.settings.config.getConfig("akka.kafka.producer")
+    val config = system.settings.config.getConfig("pekko.kafka.producer")
     val producerSettings =
       ProducerSettings(config, new StringSerializer, new StringSerializer)
         .withBootstrapServers(bootstrapServers)
@@ -42,7 +42,7 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
   }
 
   "PlainSink" should "work" in assertAllStagesStopped {
-    val config = system.settings.config.getConfig("akka.kafka.producer")
+    val config = system.settings.config.getConfig("pekko.kafka.producer")
     val producerSettings =
       ProducerSettings(config, new StringSerializer, new StringSerializer)
         .withBootstrapServers(bootstrapServers)
@@ -95,7 +95,7 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
   }
 
   "Metrics" should "be observed" in assertAllStagesStopped {
-    val config = system.settings.config.getConfig("akka.kafka.producer")
+    val config = system.settings.config.getConfig("pekko.kafka.producer")
     val producerSettings =
       ProducerSettings(config, new StringSerializer, new StringSerializer)
         .withBootstrapServers(bootstrapServers)
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
index f0f08c7c..4e616dc4 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
@@ -30,11 +30,11 @@ class ConsumerSettingsSpec
 
     "handle nested kafka-clients properties" in {
       val conf = ConfigFactory.parseString("""
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.bootstrap.foo = baz
-        akka.kafka.consumer.kafka-clients.foo = bar
-        akka.kafka.consumer.kafka-clients.client.id = client1
-        """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.bootstrap.foo = baz
+        pekko.kafka.consumer.kafka-clients.foo = bar
+        pekko.kafka.consumer.kafka-clients.client.id = client1
+        """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
       val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new StringDeserializer)
       settings.getProperty("bootstrap.servers") should ===("localhost:9092")
       settings.getProperty("client.id") should ===("client1")
@@ -46,22 +46,22 @@ class ConsumerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.client.id = client1
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.consumer")
+        .getConfig("pekko.kafka.consumer")
       val settings = ConsumerSettings(conf, None, None)
       settings.getProperty("bootstrap.servers") should ===("localhost:9092")
     }
 
     "handle deserializers passed as args config" in {
       val conf = ConfigFactory.parseString("""
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.parallelism = 1
-        """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.parallelism = 1
+        """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
       val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new StringDeserializer)
       settings.getProperty("bootstrap.servers") should ===("localhost:9092")
     }
@@ -70,12 +70,12 @@ class ConsumerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.client.id = client1
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.consumer")
+        .getConfig("pekko.kafka.consumer")
       val settings = ConsumerSettings(conf, Some(new ByteArrayDeserializer), None)
       settings.getProperty("bootstrap.servers") should ===("localhost:9092")
     }
@@ -84,18 +84,18 @@ class ConsumerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.client.id = client1
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.consumer")
+        .getConfig("pekko.kafka.consumer")
       val settings = ConsumerSettings(conf, None, Some(new ByteArrayDeserializer))
       settings.getProperty("bootstrap.servers") should ===("localhost:9092")
     }
 
     "filter passwords from kafka-clients properties" in {
-      val conf = ConfigFactory.load().getConfig("akka.kafka.consumer")
+      val conf = ConfigFactory.load().getConfig("pekko.kafka.consumer")
       val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new StringDeserializer)
         .withProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hemligt")
         .withProperty("ssl.truststore.password", "geheim")
@@ -110,12 +110,12 @@ class ConsumerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.client.id = client1
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.consumer")
+        .getConfig("pekko.kafka.consumer")
       val exception = intercept[IllegalArgumentException] {
         ConsumerSettings(conf, None, None)
       }
@@ -125,9 +125,9 @@ class ConsumerSettingsSpec
 
     "throw IllegalArgumentException if no value deserializer defined (null case). Key serializer passed as args config" in {
       val conf = ConfigFactory.parseString("""
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.client.id = client1
-        """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.client.id = client1
+        """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
       val exception = intercept[IllegalArgumentException] {
         ConsumerSettings(conf, new ByteArrayDeserializer, null)
       }
@@ -139,12 +139,12 @@ class ConsumerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.client.id = client1
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.consumer")
+        .getConfig("pekko.kafka.consumer")
       val exception = intercept[IllegalArgumentException] {
         ConsumerSettings(conf, None, null)
       }
@@ -156,12 +156,12 @@ class ConsumerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.client.id = client1
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.consumer")
+        .getConfig("pekko.kafka.consumer")
       val exception = intercept[IllegalArgumentException] {
         ConsumerSettings(conf, None, None)
       }
@@ -171,9 +171,9 @@ class ConsumerSettingsSpec
 
     "throw IllegalArgumentException if no key deserializer defined (null case). Value serializer passed as args config" in {
       val conf = ConfigFactory.parseString("""
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.client.id = client1
-        """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.client.id = client1
+        """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
       val exception = intercept[IllegalArgumentException] {
         ConsumerSettings(conf, null, new ByteArrayDeserializer)
       }
@@ -185,12 +185,12 @@ class ConsumerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
-        akka.kafka.consumer.kafka-clients.client.id = client1
+        pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+        pekko.kafka.consumer.kafka-clients.client.id = client1
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.consumer")
+        .getConfig("pekko.kafka.consumer")
       val exception = intercept[IllegalArgumentException] {
         ConsumerSettings(conf, null, None)
       }
@@ -244,7 +244,7 @@ object ConsumerSettingsSpec {
   val DiscoveryConfigSection =
     s"""
        // #discovery-service
-      discovery-consumer: $${akka.kafka.consumer} {
+      discovery-consumer: $${pekko.kafka.consumer} {
         service-name = "kafkaService1"
         resolve-timeout = 10 ms
       }
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala
index 9da53069..31f85eec 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala
@@ -30,22 +30,22 @@ class ProducerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
-        akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.producer")
+        .getConfig("pekko.kafka.producer")
       val settings = ProducerSettings(conf, None, None)
       settings.properties("bootstrap.servers") should ===("localhost:9092")
     }
 
     "handle serializers passed as args config" in {
       val conf = ConfigFactory.parseString("""
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.producer")
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.producer")
       val settings = ProducerSettings(conf, new ByteArraySerializer, new StringSerializer)
       settings.properties("bootstrap.servers") should ===("localhost:9092")
     }
@@ -54,12 +54,12 @@ class ProducerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.producer")
+        .getConfig("pekko.kafka.producer")
       val settings = ProducerSettings(conf, Some(new ByteArraySerializer), None)
       settings.properties("bootstrap.servers") should ===("localhost:9092")
     }
@@ -68,12 +68,12 @@ class ProducerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.producer")
+        .getConfig("pekko.kafka.producer")
       val settings = ProducerSettings(conf, None, Some(new ByteArraySerializer))
       settings.properties("bootstrap.servers") should ===("localhost:9092")
     }
@@ -94,12 +94,12 @@ class ProducerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.producer")
+        .getConfig("pekko.kafka.producer")
       val exception = intercept[IllegalArgumentException] {
         ProducerSettings(conf, None, None)
       }
@@ -109,9 +109,9 @@ class ProducerSettingsSpec
 
     "throw IllegalArgumentException if no value serializer defined (null case). Key serializer passed as args config" in {
       val conf = ConfigFactory.parseString("""
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.producer")
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.producer")
       val exception = intercept[IllegalArgumentException] {
         ProducerSettings(conf, new ByteArraySerializer, null)
       }
@@ -123,12 +123,12 @@ class ProducerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.producer")
+        .getConfig("pekko.kafka.producer")
       val exception = intercept[IllegalArgumentException] {
         ProducerSettings(conf, None, null)
       }
@@ -140,12 +140,12 @@ class ProducerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.producer")
+        .getConfig("pekko.kafka.producer")
       val exception = intercept[IllegalArgumentException] {
         ProducerSettings(conf, None, None)
       }
@@ -155,9 +155,9 @@ class ProducerSettingsSpec
 
     "throw IllegalArgumentException if no key serializer defined (null case). Value serializer passed as args config" in {
       val conf = ConfigFactory.parseString("""
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.producer")
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.producer")
       val exception = intercept[IllegalArgumentException] {
         ProducerSettings(conf, null, new ByteArraySerializer)
       }
@@ -169,12 +169,12 @@ class ProducerSettingsSpec
       val conf = ConfigFactory
         .parseString(
           """
-        akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
-        akka.kafka.producer.kafka-clients.parallelism = 1
-        akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+        pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+        pekko.kafka.producer.kafka-clients.parallelism = 1
+        pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
         """)
         .withFallback(ConfigFactory.load())
-        .getConfig("akka.kafka.producer")
+        .getConfig("pekko.kafka.producer")
       val exception = intercept[IllegalArgumentException] {
         ProducerSettings(conf, null, None)
       }
@@ -231,7 +231,7 @@ object ProducerSettingsSpec {
   val DiscoveryConfigSection =
     s"""
         // #discovery-service
-        discovery-producer: $${akka.kafka.producer} {
+        discovery-producer: $${pekko.kafka.producer} {
           service-name = "kafkaService1"
           resolve-timeout = 10 ms
         }
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index 099c28c6..ebbd2ab5 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -68,7 +68,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
     this(
       ActorSystem("CommittingWithMockSpec",
         ConfigFactory
-          .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+          .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
           .withFallback(ConfigFactory.load())))
 
   override def afterAll(): Unit =
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index 37f66b39..40cdc909 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -64,7 +64,7 @@ class ConsumerSpec(_system: ActorSystem)
     this(
       ActorSystem("ConsumerSpec",
         ConfigFactory
-          .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+          .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
           .withFallback(ConfigFactory.load())))
 
   override def afterAll(): Unit =
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 1e1537b4..1c211cc6 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -50,7 +50,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
     this(
       ActorSystem("PartitionedSourceSpec",
         ConfigFactory
-          .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+          .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
           .withFallback(ConfigFactory.load())))
 
   override def afterAll(): Unit =
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 6af86887..b3e8a8f1 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -49,7 +49,7 @@ class ProducerSpec(_system: ActorSystem)
     this(
       ActorSystem("ProducerSpec",
         ConfigFactory
-          .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+          .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
           .withFallback(ConfigFactory.load())))
 
   override def afterAll(): Unit = shutdown(system)
@@ -200,7 +200,7 @@ class ProducerSpec(_system: ActorSystem)
     source.sendError(sourceError)
 
     // Here we can not be sure that all messages from source delivered to producer
-    // because of buffers in akka-stream and faster error pushing that ignores buffers
+    // because of buffers in pekko-stream and faster error pushing that ignores buffers
     sink.expectError(sourceError)
 
     client.verifyClosed()
@@ -546,7 +546,7 @@ class ProducerSpec(_system: ActorSystem)
     source.sendError(new Exception())
 
     // Here we can not be sure that all messages from source delivered to producer
-    // because of buffers in akka-stream and faster error pushing that ignores buffers
+    // because of buffers in pekko-stream and faster error pushing that ignores buffers
     // TODO: we can await a tx to be initialized before sending the error (which means the producer was assigned and first msg processed). does that invalidate this test?
 
     Await.ready(sink, remainingOrDefault)
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
index 671adfaf..029b9fc8 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
@@ -235,7 +235,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside
         Await.result(produce(topic, 1 to 100), remainingOrDefault)
 
         // Create ConsumerActor manually
-        // https://doc.akka.io/docs/akka-stream-kafka/0.20/consumer.html#sharing-kafkaconsumer
+        // https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#sharing-kafkaconsumer
         val consumer = system.actorOf(KafkaConsumerActor.props(consumerDefaults.withGroupId(group)))
 
         // Timeout for metadata fetching requests
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
index b1876576..4f31a761 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
@@ -239,13 +239,13 @@ object PekkoConnectorsAssignor {
  * client id so that we can filter them during assignment. The member id is a concatenation of the client id and the
  * group member instance id that's generated by the Consumer Group coordinator.
  *
- * Pass a client.id -> Set[TopicPartition] map to `AlpakkaAssignor.clientIdToPartitionMap` **before** you anticipate a
+ * Pass a client.id -> Set[TopicPartition] map to `PekkoConnectorsAssignor.clientIdToPartitionMap` **before** you anticipate a
  * rebalance to occur in your test.
  */
 class PekkoConnectorsAssignor extends AbstractPartitionAssignor {
   val log: Logger = LoggerFactory.getLogger(getClass)
 
-  override def name(): String = "alpakka-test"
+  override def name(): String = "pekko-connector-kafka-test"
 
   override def assign(
       partitionsPerTopic: util.Map[String, Integer],
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
index 93156a79..ef44b47b 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
@@ -10,7 +10,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent
 import ch.qos.logback.core.AppenderBase
 
 /**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
  *
  * INTERNAL API
  */
@@ -34,13 +34,13 @@ import ch.qos.logback.core.AppenderBase
 }
 
 /**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
  *
  * INTERNAL API
  *
  * Logging from tests can be silenced by this appender. When there is a test failure
  * the captured logging events are flushed to the appenders defined for the
- * org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
+ * org.apache.org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
  *
  * The flushing on test failure is handled by [[org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing]]
  * for ScalaTest and [[org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing]] for JUnit.
@@ -48,9 +48,9 @@ import ch.qos.logback.core.AppenderBase
  * Use configuration like the following the logback-test.xml:
  *
  * {{{
- *     <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
+ *     <appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
  *
- *     <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
+ *     <logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
  *       <appender-ref ref="STDOUT"/>
  *     </logger>
  *
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala
index 00d24454..7009c22c 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala
@@ -10,7 +10,7 @@ import ch.qos.logback.classic.Level
 import org.slf4j.LoggerFactory
 
 /**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
  *
  * INTERNAL API
  */
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala
index 50efff7a..25cfa7f3 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala
@@ -14,7 +14,7 @@ import org.junit.runners.model.Statement
 import org.slf4j.LoggerFactory
 
 /**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
  *
  * JUnit `TestRule` to make log lines appear only when the test failed.
  *
@@ -26,9 +26,9 @@ import org.slf4j.LoggerFactory
  * Requires Logback and configuration like the following the logback-test.xml:
  *
  * {{{
- *     <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
+ *     <appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
  *
- *     <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
+ *     <logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
  *       <appender-ref ref="STDOUT"/>
  *     </logger>
  *
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala
index 701308cc..3236b825 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala
@@ -15,16 +15,16 @@ import org.scalatest.TestSuite
 import org.slf4j.LoggerFactory
 
 /**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
  *
  * Mixin this trait to a ScalaTest test to make log lines appear only when the test failed.
  *
  * Requires Logback and configuration like the following the logback-test.xml:
  *
  * {{{
- *     <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
+ *     <appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
  *
- *     <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
+ *     <logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
  *       <appender-ref ref="STDOUT"/>
  *     </logger>
  *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org