You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2023/02/09 23:06:05 UTC
[incubator-pekko-connectors-kafka] branch main updated: Using `pekko` instead of `akka` in reference.conf and related configurations (#30)
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 7ee59d4d Using `pekko` instead of `akka` in reference.conf and related configurations (#30)
7ee59d4d is described below
commit 7ee59d4d7e6808d2141fb6d02143bbedf6f222ef
Author: Seeta Ramayya <35...@users.noreply.github.com>
AuthorDate: Fri Feb 10 00:05:59 2023 +0100
Using `pekko` instead of `akka` in reference.conf and related configurations (#30)
* Configuration : Using pekko instead of akka in reference.conf and other conf files
* Replaced akka documentation url with pekko documentation url
---
.github/workflows/check-build-test.yml | 2 +-
.../pekko/kafka/benchmarks/BatchedConsumer.scala | 7 +-
.../apache/pekko/kafka/benchmarks/Benchmarks.scala | 8 +-
.../kafka/benchmarks/NoCommitBackpressure.scala | 2 +-
.../PekkoConnectorsCommittableProducer.scala | 8 +-
.../apache/pekko/kafka/benchmarks/Producer.scala | 2 +-
.../pekko/kafka/benchmarks/Transactions.scala | 4 +-
benchmarks/src/main/resources/reference.conf | 6 +-
.../pekko/kafka/benchmarks/InflightMetrics.scala | 2 +-
.../cluster/sharding/KafkaClusterSharding.scala | 20 ++---
core/src/main/resources/reference.conf | 42 +++++-----
.../org/apache/pekko/kafka/CommitterSettings.scala | 22 +++---
.../pekko/kafka/ConnectionCheckerSettings.scala | 2 +-
.../org/apache/pekko/kafka/ConsumerSettings.scala | 38 ++++-----
.../kafka/OffsetResetProtectionSettings.scala | 2 +-
.../org/apache/pekko/kafka/ProducerSettings.scala | 38 ++++-----
.../kafka/internal/ConsumerResetProtection.scala | 4 +-
.../pekko/kafka/internal/LoggingWithId.scala | 4 +-
.../pekko/kafka/internal/SourceLogicBuffer.scala | 2 +-
.../org/apache/pekko/kafka/javadsl/Consumer.scala | 6 +-
.../org/apache/pekko/kafka/javadsl/Producer.scala | 6 +-
.../apache/pekko/kafka/javadsl/SendProducer.scala | 4 +-
.../apache/pekko/kafka/javadsl/Transactional.scala | 6 +-
.../org/apache/pekko/kafka/scaladsl/Consumer.scala | 16 ++--
.../pekko/kafka/scaladsl/DiscoverySupport.scala | 8 +-
.../org/apache/pekko/kafka/scaladsl/Producer.scala | 6 +-
.../apache/pekko/kafka/scaladsl/SendProducer.scala | 2 +-
.../pekko/kafka/scaladsl/Transactional.scala | 6 +-
project/ProjectSettings.scala | 6 +-
project/VersionGenerator.scala | 8 +-
project/Versions.scala | 2 +-
project/project-info.conf | 2 +-
.../testkit/internal/KafkaContainerCluster.java | 2 +-
testkit/src/main/resources/reference.conf | 10 +--
.../pekko/kafka/testkit/KafkaTestkitSettings.scala | 2 +-
.../KafkaTestkitTestcontainersSettings.scala | 2 +-
tests/src/it/resources/reference.conf | 8 +-
.../java/docs/javadsl/ConsumerExampleTest.java | 2 +-
tests/src/test/java/docs/javadsl/ProducerTest.java | 2 +-
tests/src/test/resources/application.conf | 8 +-
.../docs/scaladsl/ClusterShardingExample.scala | 6 +-
.../test/scala/docs/scaladsl/ConsumerExample.scala | 2 +-
.../test/scala/docs/scaladsl/ProducerExample.scala | 6 +-
.../apache/pekko/kafka/ConsumerSettingsSpec.scala | 90 +++++++++++-----------
.../apache/pekko/kafka/ProducerSettingsSpec.scala | 78 +++++++++----------
.../kafka/internal/CommittingWithMockSpec.scala | 2 +-
.../apache/pekko/kafka/internal/ConsumerSpec.scala | 2 +-
.../kafka/internal/PartitionedSourceSpec.scala | 2 +-
.../apache/pekko/kafka/internal/ProducerSpec.scala | 6 +-
.../pekko/kafka/scaladsl/IntegrationSpec.scala | 2 +-
.../pekko/kafka/scaladsl/RebalanceSpec.scala | 4 +-
.../pekko/kafka/tests/CapturingAppender.scala | 10 +--
.../org/apache/pekko/kafka/tests/LogbackUtil.scala | 2 +-
.../kafka/tests/javadsl/LogCapturingJunit4.scala | 6 +-
.../pekko/kafka/tests/scaladsl/LogCapturing.scala | 6 +-
55 files changed, 277 insertions(+), 276 deletions(-)
diff --git a/.github/workflows/check-build-test.yml b/.github/workflows/check-build-test.yml
index bcc82532..7288ceb2 100644
--- a/.github/workflows/check-build-test.yml
+++ b/.github/workflows/check-build-test.yml
@@ -10,7 +10,7 @@ on:
tags-ignore: [ v.* ]
env:
- AKKA_TEST_TIMEFACTOR: 10.0
+ PEKKO_TEST_TIMEFACTOR: 10.0
EVENT_NAME: ${{ github.event_name }}
jobs:
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala
index 891a00a8..d7955a88 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/BatchedConsumer.scala
@@ -39,14 +39,14 @@ class ApacheKafkaBatchedConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaBatchedConsumer extends BenchmarksBase() {
it should "bench with small messages" in {
- val cmd = RunTestCommand("alpakka-kafka-batched-consumer", bootstrapServers, topic_1000_100)
+ val cmd = RunTestCommand("pekko-connectors-kafka-batched-consumer", bootstrapServers, topic_1000_100)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.committableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
}
it should "bench with normal messages" in {
- val cmd = RunTestCommand("alpakka-kafka-batched-consumer-normal-msg", bootstrapServers, topic_1000_5000)
+ val cmd = RunTestCommand("pekko-connectors-kafka-batched-consumer-normal-msg", bootstrapServers, topic_1000_5000)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.committableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
@@ -54,7 +54,8 @@ class PekkoConnectorsKafkaBatchedConsumer extends BenchmarksBase() {
it should "bench with normal messages and eight partitions" in {
val cmd =
- RunTestCommand("alpakka-kafka-batched-consumer-normal-msg-8-partitions", bootstrapServers, topic_1000_5000_8)
+ RunTestCommand("pekko-connectors-kafka-batched-consumer-normal-msg-8-partitions", bootstrapServers,
+ topic_1000_5000_8)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.committableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumerAtLeastOnceBatched(batchSize = 1000))
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala
index a72ddb8e..78c8ca1a 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Benchmarks.scala
@@ -63,7 +63,7 @@ class ApacheKafkaConsumerNokafka extends BenchmarksBase() {
class PekkoConnectorsKafkaConsumerNokafka extends BenchmarksBase() {
it should "bench" in {
- val cmd = RunTestCommand("alpakka-kafka-plain-consumer-nokafka", bootstrapServers, topic_2000_100)
+ val cmd = RunTestCommand("pekko-connectors-kafka-plain-consumer-nokafka", bootstrapServers, topic_2000_100)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.noopFixtureGen(cmd),
ReactiveKafkaConsumerBenchmarks.consumePlainNoKafka)
@@ -85,13 +85,13 @@ class ApacheKafkaPlainConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaPlainConsumer extends BenchmarksBase() {
it should "bench" in {
- val cmd = RunTestCommand("alpakka-kafka-plain-consumer", bootstrapServers, topic_2000_100)
+ val cmd = RunTestCommand("pekko-connectors-kafka-plain-consumer", bootstrapServers, topic_2000_100)
runPerfTest(cmd, ReactiveKafkaConsumerFixtures.plainSources(cmd), ReactiveKafkaConsumerBenchmarks.consumePlain)
}
it should "bench with normal messages and one hundred partitions with inflight metrics" in {
val cmd =
- RunTestCommand("alpakka-kafka-plain-consumer-normal-msg-100-partitions-with-inflight-metrics",
+ RunTestCommand("pekko-connectors-kafka-plain-consumer-normal-msg-100-partitions-with-inflight-metrics",
bootstrapServers,
topic_1000_5000_100)
val consumerMetricNames = List[ConsumerMetricRequest](
@@ -129,7 +129,7 @@ class ApacheKafkaAtMostOnceConsumer extends BenchmarksBase() {
class PekkoConnectorsKafkaAtMostOnceConsumer extends BenchmarksBase() {
it should "bench" in {
- val cmd = RunTestCommand("alpakka-kafka-at-most-once-consumer", bootstrapServers, topic_50_100)
+ val cmd = RunTestCommand("pekko-connectors-kafka-at-most-once-consumer", bootstrapServers, topic_50_100)
runPerfTest(cmd,
ReactiveKafkaConsumerFixtures.committableSources(cmd),
ReactiveKafkaConsumerBenchmarks.consumeCommitAtMostOnce)
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala
index d7e66849..ecb8d464 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/NoCommitBackpressure.scala
@@ -39,7 +39,7 @@ class RawKafkaCommitEveryPollConsumer extends BenchmarksBase() {
}
class PekkoConnectorsCommitAndForgetConsumer extends BenchmarksBase() {
- val prefix = "alpakka-kafka-commit-and-forget-"
+ val prefix = "pekko-connectors-kafka-commit-and-forget-"
it should "bench with small messages" in {
val cmd = RunTestCommand(prefix + "consumer", bootstrapServers, topic_1000_100)
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala
index 4713f41b..906f7c71 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableProducer.scala
@@ -14,7 +14,7 @@ import org.apache.pekko.kafka.benchmarks.app.RunTestCommand
*/
class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
it should "bench composed sink with 100b messages" in {
- val cmd = RunTestCommand("alpakka-committable-producer-composed", bootstrapServers, topic_100_100)
+ val cmd = RunTestCommand("pekko-connectors-committable-producer-composed", bootstrapServers, topic_100_100)
runPerfTest(
cmd,
PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
@@ -22,7 +22,7 @@ class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
}
it should "bench composed sink with 5000b messages" in {
- val cmd = RunTestCommand("alpakka-committable-producer-composed-5000b", bootstrapServers, topic_100_5000)
+ val cmd = RunTestCommand("pekko-connectors-committable-producer-composed-5000b", bootstrapServers, topic_100_5000)
runPerfTest(
cmd,
PekkoConnectorsCommittableSinkFixtures.composedSink(cmd),
@@ -30,7 +30,7 @@ class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
}
it should "bench `Producer.committableSink` with 100b messages" in {
- val cmd = RunTestCommand("alpakka-committable-producer", bootstrapServers, topic_100_100)
+ val cmd = RunTestCommand("pekko-connectors-committable-producer", bootstrapServers, topic_100_100)
runPerfTest(
cmd,
PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
@@ -38,7 +38,7 @@ class PekkoConnectorsCommittableProducer extends BenchmarksBase() {
}
it should "bench `Producer.committableSink` with 5000b messages" in {
- val cmd = RunTestCommand("alpakka-committable-producer-5000b", bootstrapServers, topic_100_5000)
+ val cmd = RunTestCommand("pekko-connectors-committable-producer-5000b", bootstrapServers, topic_100_5000)
runPerfTest(
cmd,
PekkoConnectorsCommittableSinkFixtures.producerSink(cmd),
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala
index d6441081..17b093c5 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Producer.scala
@@ -40,7 +40,7 @@ class ApacheKafkaPlainProducer extends BenchmarksBase() {
}
class PekkoConnectorsKafkaPlainProducer extends BenchmarksBase() {
- private val prefix = "alpakka-kafka-plain-producer"
+ private val prefix = "pekko-connector-kafka-plain-producer"
it should "bench with small messages" in {
val cmd = RunTestCommand(prefix, bootstrapServers, topic_2000_100.freshTopic)
diff --git a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala
index a2348a84..fda70f25 100644
--- a/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala
+++ b/benchmarks/src/it/scala/org/apache/pekko/kafka/benchmarks/Transactions.scala
@@ -28,7 +28,7 @@ class ApacheKafkaTransactions extends BenchmarksBase() {
class PekkoConnectorsKafkaTransactions extends BenchmarksBase() {
it should "bench with small messages" in {
- val cmd = RunTestCommand("alpakka-kafka-transactions", bootstrapServers, topic_100_100)
+ val cmd = RunTestCommand("pekko-connectors-kafka-transactions", bootstrapServers, topic_100_100)
runPerfTest(
cmd,
ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
@@ -36,7 +36,7 @@ class PekkoConnectorsKafkaTransactions extends BenchmarksBase() {
}
it should "bench with normal messages" in {
- val cmd = RunTestCommand("alpakka-kafka-transactions-normal-msg", bootstrapServers, topic_100_5000)
+ val cmd = RunTestCommand("pekko-connectors-kafka-transactions-normal-msg", bootstrapServers, topic_100_5000)
runPerfTest(
cmd,
ReactiveKafkaTransactionFixtures.transactionalSourceAndSink(cmd, commitInterval = 100.milliseconds),
diff --git a/benchmarks/src/main/resources/reference.conf b/benchmarks/src/main/resources/reference.conf
index 594f20a6..a158d31c 100644
--- a/benchmarks/src/main/resources/reference.conf
+++ b/benchmarks/src/main/resources/reference.conf
@@ -1,5 +1,5 @@
-akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
- logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
}
\ No newline at end of file
diff --git a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
index 6ca00b04..999c5561 100644
--- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
+++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala
@@ -151,7 +151,7 @@ private[benchmarks] trait InflightMetrics {
}
/**
- * Return specified consumer-level metrics using Alpakka Kafka's [[Control]] metrics API.
+ * Return specified consumer-level metrics using Apache Pekko Connectors Kafka's [[Control]] metrics API.
*/
private def consumer[T](control: Control, requests: List[ConsumerMetricRequest])(
implicit ec: ExecutionContext): Future[List[Measurement]] = {
diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
index 149d541b..1952f181 100644
--- a/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
+++ b/cluster-sharding/src/main/scala/org/apache/pekko/kafka/cluster/sharding/KafkaClusterSharding.scala
@@ -33,7 +33,7 @@ import scala.compat.java8.FutureConverters._
/**
* API MAY CHANGE
*
- * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka.
+ * Apache Pekko Extension to enable Apache Pekko Cluster External Sharding with Apache Pekko Connector Kafka.
*/
@ApiMayChange(issue = "https://github.com/akka/alpakka-kafka/issues/1074")
final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension {
@@ -205,13 +205,13 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
/**
* API MAY CHANGE
*
- * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
+ * Create an Apache Pekko Connector Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
* used to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member
* the rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy
- * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+ * accordingly so that entities are (eventually) routed to the local Apache Pekko cluster member.
*
- * Returns an Akka typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
- * passed to an Alpakka Kafka [[ConsumerSettings]].
+ * Returns an Apache Pekko typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
+ * passed to an Apache Pekko Connector Kafka [[ConsumerSettings]].
*
* {{{
* import org.apache.pekko.actor.typed.scaladsl.adapter._
@@ -232,13 +232,13 @@ final class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension
*
* API MAY CHANGE
*
- * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
+ * Create an Apache Pekko Connector Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is
* used to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member
* the rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy
- * accordingly so that entities are (eventually) routed to the local Akka cluster member.
+ * accordingly so that entities are (eventually) routed to the local Apache Pekko cluster member.
*
- * Returns an Akka typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
- * passed to an Alpakka Kafka [[ConsumerSettings]].
+ * Returns an Apache Pekko typed [[org.apache.pekko.actor.typed.ActorRef]]. This must be converted to a classic actor before it can be
+ * passed to an Apache Pekko Connector Kafka [[ConsumerSettings]].
*
* {{{
* import org.apache.pekko.actor.typed.scaladsl.adapter._
@@ -304,7 +304,7 @@ object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] {
val updates = shardAllocationClient.updateShardLocations(partitions.map { tp =>
val shardId = tp.partition().toString
- // the Kafka partition number becomes the akka shard id
+ // the Kafka partition number becomes the Apache Pekko shard id
(shardId, address)
}.toMap)
diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf
index 83fa291d..efd7a5e2 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -1,15 +1,15 @@
# // #producer-settings
-# Properties for akka.kafka.ProducerSettings can be
+# Properties for org.apache.pekko.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
-akka.kafka.producer {
- # Config path of Akka Discovery method
- # "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
- discovery-method = akka.discovery
+pekko.kafka.producer {
+ # Config path of Apache Pekko Discovery method
+ # "pekko.discovery" to use the Apache Pekko Discovery method configured for the ActorSystem
+ discovery-method = pekko.discovery
- # Set a service name for use with Akka Discovery
- # https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
+ # Set a service name for use with Apache Pekko Discovery
+ # https://pekko.apache.org/docs/pekko-connectors-kafka/current/discovery.html
service-name = ""
# Timeout for getting a reply from the discovery-method lookup
@@ -30,7 +30,7 @@ akka.kafka.producer {
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
- use-dispatcher = "akka.kafka.default-dispatcher"
+ use-dispatcher = "pekko.kafka.default-dispatcher"
# The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
# for exactly-once-semantics processing.
@@ -44,16 +44,16 @@ akka.kafka.producer {
# // #producer-settings
# // #consumer-settings
-# Properties for akka.kafka.ConsumerSettings can be
+# Properties for pekko.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
-akka.kafka.consumer {
- # Config path of Akka Discovery method
- # "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
- discovery-method = akka.discovery
+pekko.kafka.consumer {
+ # Config path of Apache Pekko Discovery method
+ # "pekko.discovery" to use the Apache Pekko Discovery method configured for the ActorSystem
+ discovery-method = pekko.discovery
- # Set a service name for use with Akka Discovery
- # https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
+ # Set a service name for use with Apache Pekko Discovery
+ # https://pekko.apache.org/docs/pekko-connectors-kafka/current/discovery.html
service-name = ""
# Timeout for getting a reply from the discovery-method lookup
@@ -92,7 +92,7 @@ akka.kafka.consumer {
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
- use-dispatcher = "akka.kafka.default-dispatcher"
+ use-dispatcher = "pekko.kafka.default-dispatcher"
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# can be defined in this configuration section.
@@ -111,7 +111,7 @@ akka.kafka.consumer {
# call to Kafka's API
offset-for-times-timeout = 5s
- # Timeout for akka.kafka.Metadata requests
+ # Timeout for org.apache.pekko.kafka.Metadata requests
# This value is used instead of Kafka's default from `default.api.timeout.ms`
# which is 1 minute.
metadata-request-timeout = 5s
@@ -147,7 +147,7 @@ akka.kafka.consumer {
# then causes the Kafka consumer to follow its normal 'auto.offset.reset' behavior. For 'earliest', these settings
# allow the client to detect and attempt to recover from this issue. For 'none' and 'latest', these settings will
# only add overhead. See
- # https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#unexpected-consumer-offset-reset
+ # https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#unexpected-consumer-offset-reset
# for more information
offset-reset-protection {
# turns on reset protection
@@ -162,10 +162,10 @@ akka.kafka.consumer {
# // #consumer-settings
# // #committer-settings
-# Properties for akka.kafka.CommitterSettings can be
+# Properties for org.apache.pekko.kafka.CommitterSettings can be
# defined in this section or a configuration section with
# the same layout.
-akka.kafka.committer {
+pekko.kafka.committer {
# Maximum number of messages in a single commit batch
max-batch = 1000
@@ -192,7 +192,7 @@ akka.kafka.committer {
# The dispatcher that will be used by default by consumer and
# producer stages.
-akka.kafka.default-dispatcher {
+pekko.kafka.default-dispatcher {
type = "Dispatcher"
executor = "thread-pool-executor"
diff --git a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
index 00e2e3e0..4b8ecaa2 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/CommitterSettings.scala
@@ -98,27 +98,27 @@ object CommitWhen {
object CommitterSettings {
- val configPath = "akka.kafka.committer"
+ val configPath = "pekko.kafka.committer"
/**
* Create settings from the default configuration
- * `akka.kafka.committer`.
+ * `pekko.kafka.committer`.
*/
def apply(actorSystem: org.apache.pekko.actor.ActorSystem): CommitterSettings =
apply(actorSystem.settings.config.getConfig(configPath))
/**
* Create settings from the default configuration
- * `akka.kafka.committer`.
+ * `pekko.kafka.committer`.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def apply(actorSystem: org.apache.pekko.actor.ClassicActorSystemProvider): CommitterSettings =
apply(actorSystem.classicSystem.settings.config.getConfig(configPath))
/**
* Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.committer`.
+ * the default configuration `pekko.kafka.committer`.
*/
def apply(config: Config): CommitterSettings = {
val maxBatch = config.getLong("max-batch")
@@ -131,23 +131,23 @@ object CommitterSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.committer`.
+ * `pekko.kafka.committer`.
*/
def create(actorSystem: org.apache.pekko.actor.ActorSystem): CommitterSettings =
apply(actorSystem)
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.committer`.
+ * `pekko.kafka.committer`.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def create(actorSystem: org.apache.pekko.actor.ClassicActorSystemProvider): CommitterSettings =
apply(actorSystem)
/**
* Java API: Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.committer`.
+ * the default configuration `pekko.kafka.committer`.
*/
def create(config: Config): CommitterSettings =
apply(config)
@@ -155,7 +155,7 @@ object CommitterSettings {
}
/**
- * Settings for committer. See `akka.kafka.committer` section in
+ * Settings for committer. See `pekko.kafka.committer` section in
* reference.conf. Note that the [[org.apache.pekko.kafka.CommitterSettings$ companion]] object provides
* `apply` and `create` functions for convenient construction of the settings, together with
* the `with` methods.
@@ -195,7 +195,7 @@ class CommitterSettings private (
new CommitterSettings(maxBatch, maxInterval, parallelism, delivery, when)
override def toString: String =
- "akka.kafka.CommitterSettings(" +
+ "org.apache.pekko.kafka.CommitterSettings(" +
s"maxBatch=$maxBatch," +
s"maxInterval=${maxInterval.toCoarsest}," +
s"parallelism=$parallelism," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
index 2745ab51..77f8da3f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConnectionCheckerSettings.scala
@@ -42,7 +42,7 @@ class ConnectionCheckerSettings private[kafka] (val enable: Boolean,
copy(checkInterval = checkInterval.asScala)
override def toString: String =
- s"akka.kafka.ConnectionCheckerSettings(" +
+ s"org.apache.pekko.kafka.ConnectionCheckerSettings(" +
s"enable=$enable," +
s"maxRetries=$maxRetries," +
s"checkInterval=${checkInterval.toCoarsest}," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
index 113aed60..18eac4fd 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ConsumerSettings.scala
@@ -23,11 +23,11 @@ import scala.concurrent.duration._
object ConsumerSettings {
- val configPath = "akka.kafka.consumer"
+ val configPath = "pekko.kafka.consumer"
/**
* Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
@@ -40,10 +40,10 @@ object ConsumerSettings {
/**
* Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def apply[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -53,7 +53,7 @@ object ConsumerSettings {
/**
* Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.consumer`.
+ * the default configuration `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
@@ -113,7 +113,7 @@ object ConsumerSettings {
/**
* Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
@@ -124,10 +124,10 @@ object ConsumerSettings {
/**
* Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def apply[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -137,7 +137,7 @@ object ConsumerSettings {
/**
* Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.consumer`.
+ * the default configuration `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
@@ -148,7 +148,7 @@ object ConsumerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
@@ -159,10 +159,10 @@ object ConsumerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def create[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -172,7 +172,7 @@ object ConsumerSettings {
/**
* Java API: Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.consumer`.
+ * the default configuration `pekko.kafka.consumer`.
* Key or value deserializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
@@ -183,7 +183,7 @@ object ConsumerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
@@ -194,10 +194,10 @@ object ConsumerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.consumer`.
+ * `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def create[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -207,7 +207,7 @@ object ConsumerSettings {
/**
* Java API: Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.consumer`.
+ * the default configuration `pekko.kafka.consumer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
@@ -227,7 +227,7 @@ object ConsumerSettings {
}
/**
- * Settings for consumers. See `akka.kafka.consumer` section in
+ * Settings for consumers. See `pekko.kafka.consumer` section in
* `reference.conf`. Note that the [[org.apache.pekko.kafka.ConsumerSettings$ companion]] object provides
* `apply` and `create` functions for convenient construction of the settings, together with
* the `with` methods.
@@ -623,7 +623,7 @@ class ConsumerSettings[K, V] @InternalApi private[kafka] (
}
.sortBy(_._1)
.mkString(",")
- "akka.kafka.ConsumerSettings(" +
+ "org.apache.pekko.kafka.ConsumerSettings(" +
s"properties=$kafkaClients," +
s"keyDeserializer=$keyDeserializerOpt," +
s"valueDeserializer=$valueDeserializerOpt," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
index 1cba0cd4..dcd70420 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/OffsetResetProtectionSettings.scala
@@ -53,7 +53,7 @@ class OffsetResetProtectionSettings @InternalApi private[kafka] (val enable: Boo
copy(timeThreshold = timeThreshold.asScala)
override def toString: String =
- s"akka.kafka.OffsetResetProtectionSettings(" +
+ s"org.apache.pekko.kafka.OffsetResetProtectionSettings(" +
s"enable=$enable," +
s"offsetThreshold=$offsetThreshold," +
s"timeThreshold=${timeThreshold.toCoarsest}" +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
index de49a5b6..f447d336 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/ProducerSettings.scala
@@ -24,11 +24,11 @@ import scala.compat.java8.FutureConverters._
object ProducerSettings {
- val configPath = "akka.kafka.producer"
+ val configPath = "pekko.kafka.producer"
/**
* Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
@@ -39,10 +39,10 @@ object ProducerSettings {
/**
* Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def apply[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -52,7 +52,7 @@ object ProducerSettings {
/**
* Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.producer`.
+ * the default configuration `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def apply[K, V](
@@ -88,7 +88,7 @@ object ProducerSettings {
/**
* Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
@@ -99,10 +99,10 @@ object ProducerSettings {
/**
* Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def apply[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -112,7 +112,7 @@ object ProducerSettings {
/**
* Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.producer`.
+ * the default configuration `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*/
def apply[K, V](
@@ -123,7 +123,7 @@ object ProducerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
@@ -134,10 +134,10 @@ object ProducerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def create[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -147,7 +147,7 @@ object ProducerSettings {
/**
* Java API: Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.producer`.
+ * the default configuration `pekko.kafka.producer`.
* Key or value serializer can be passed explicitly or retrieved from configuration.
*/
def create[K, V](
@@ -158,7 +158,7 @@ object ProducerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
@@ -169,10 +169,10 @@ object ProducerSettings {
/**
* Java API: Create settings from the default configuration
- * `akka.kafka.producer`.
+ * `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*
- * For use with the `akka.actor.typed` API.
+ * For use with the `pekko.actor.typed` API.
*/
def create[K, V](
system: org.apache.pekko.actor.ClassicActorSystemProvider,
@@ -182,7 +182,7 @@ object ProducerSettings {
/**
* Java API: Create settings from a configuration with the same layout as
- * the default configuration `akka.kafka.producer`.
+ * the default configuration `pekko.kafka.producer`.
* Key and value serializer must be passed explicitly.
*/
def create[K, V](
@@ -201,7 +201,7 @@ object ProducerSettings {
}
/**
- * Settings for producers. See `akka.kafka.producer` section in
+ * Settings for producers. See `pekko.kafka.producer` section in
* reference.conf. Note that the [[org.apache.pekko.kafka.ProducerSettings$ companion]] object provides
* `apply` and `create` functions for convenient construction of the settings, together with
* the `with` methods.
@@ -392,7 +392,7 @@ class ProducerSettings[K, V] @InternalApi private[kafka] (
}
.sortBy(_._1)
.mkString(",")
- "akka.kafka.ProducerSettings(" +
+ "org.apache.pekko.kafka.ProducerSettings(" +
s"properties=$kafkaClients," +
s"keySerializer=$keySerializerOpt," +
s"valueSerializer=$valueSerializerOpt," +
diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
index 2cc5dd6b..8048c67e 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/ConsumerResetProtection.scala
@@ -109,13 +109,13 @@ object ConsumerResetProtection {
log.warning(
s"Your last commit request $previouslyCommitted is more than the configured threshold from the last" +
s"committed offset ($committed) for $tp. See " +
- "https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#setting-offset-threshold-appropriately for more info.")
+ "https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#setting-offset-threshold-appropriately for more info.")
}
log.warning(
s"Dropping offsets for partition $tp - received an offset which is less than allowed $threshold " +
s"from the last requested offset (threshold: $threshold). Seeking to the latest known safe (committed " +
s"or assigned) offset: $committed. See " +
- "https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html#unexpected-consumer-offset-reset" +
+ "https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#unexpected-consumer-offset-reset" +
"for more information.")
consumer ! Seek(Map(tp -> committed.offset()))
None
diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
index 761a3bf9..cc7b7ec2 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/LoggingWithId.scala
@@ -18,7 +18,7 @@ private[internal] trait InstanceId {
}
/**
- * Override akka streams [[StageLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
+ * Override Apache Pekko streams [[StageLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
*/
private[internal] trait StageIdLogging extends StageLogging with InstanceId { self: GraphStageLogic =>
private[this] var _log: LoggingAdapter = _
@@ -32,7 +32,7 @@ private[internal] trait StageIdLogging extends StageLogging with InstanceId { se
}
/**
- * Override akka classic [[ActorLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
+ * Override Apache Pekko classic [[ActorLogging]] to include an ID from [[InstanceId]] as a prefix to each logging statement.
*/
private[internal] trait ActorIdLogging extends ActorLogging with InstanceId { this: Actor =>
private[this] var _log: LoggingAdapter = _
diff --git a/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala b/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala
index 8437662c..346e0cf6 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/internal/SourceLogicBuffer.scala
@@ -13,7 +13,7 @@ import org.apache.kafka.common.TopicPartition
* A buffer of messages provided by the [[KafkaConsumerActor]] for a Source Logic. When partitions are rebalanced
* away from this Source Logic preemptively filter out messages for those partitions.
*
- * NOTE: Due to the asynchronous nature of Akka Streams, it's not possible to guarantee that a message has not
+ * NOTE: Due to the asynchronous nature of Apache Pekko Streams, it's not possible to guarantee that a message has not
* already been sent downstream for a revoked partition before the rebalance handler invokes
* `filterRevokedPartitionsCB`. The best we can do is filter as many messages as possible to reduce the amount of
* duplicate messages sent downstream.
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
index 874b40c9..80be91b1 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Consumer.scala
@@ -24,7 +24,7 @@ import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
/**
- * Akka Stream connector for subscribing to Kafka topics.
+ * Apache Pekko Stream connector for subscribing to Kafka topics.
*/
object Consumer {
@@ -168,7 +168,7 @@ object Consumer {
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
- * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+ * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Producer.flowWithContext]].
*/
@ApiMayChange
@@ -192,7 +192,7 @@ object Consumer {
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
- * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+ * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Producer.flowWithContext]].
*
* This variant makes it possible to add additional metadata (in the form of a string)
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
index 522ed462..dba527ca 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Producer.scala
@@ -18,7 +18,7 @@ import scala.annotation.nowarn
import scala.compat.java8.FutureConverters._
/**
- * Akka Stream connector for publishing messages to Kafka topics.
+ * Apache Pekko Stream connector for publishing messages to Kafka topics.
*/
object Producer {
@@ -210,7 +210,7 @@ object Producer {
*
* - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
*
- * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+ * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
*
* @tparam C the flow context type
*/
@@ -278,7 +278,7 @@ object Producer {
*
* - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
*
- * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+ * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
*
* Supports sharing a Kafka Producer instance.
*
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
index 61696610..75f2a13f 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/SendProducer.scala
@@ -16,7 +16,7 @@ import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import scala.compat.java8.FutureConverters._
/**
- * Utility class for producing to Kafka without using Akka Streams.
+ * Utility class for producing to Kafka without using Apache Pekko Streams.
*/
final class SendProducer[K, V] private (underlying: scaladsl.SendProducer[K, V]) {
@@ -26,7 +26,7 @@ final class SendProducer[K, V] private (underlying: scaladsl.SendProducer[K, V])
this(scaladsl.SendProducer(settings)(system))
/**
- * Utility class for producing to Kafka without using Akka Streams.
+ * Utility class for producing to Kafka without using Apache Pekko Streams.
* @param settings producer settings used to create or access the [[org.apache.kafka.clients.producer.Producer]]
*
* The internal asynchronous operations run on the provided `Executor` (which may be an `ActorSystem`'s dispatcher).
diff --git a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
index 1d13ff39..9b651911 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/javadsl/Transactional.scala
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import scala.compat.java8.FutureConverters.FutureOps
/**
- * Akka Stream connector to support transactions between Kafka topics.
+ * Apache Pekko Stream connector to support transactions between Kafka topics.
*/
object Transactional {
@@ -39,7 +39,7 @@ object Transactional {
/**
* API MAY CHANGE
*
- * This source is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+ * This source is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Transactional.flowWithOffsetContext]].
*/
@ApiMayChange
@@ -125,7 +125,7 @@ object Transactional {
* carries [[ConsumerMessage.PartitionOffset]] as context. The flow requires a unique `transactional.id` across all app
* instances. The flow will override producer properties to enable Kafka exactly-once transactional support.
*
- * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+ * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Transactional.sourceWithOffsetContext]].
*/
@ApiMayChange
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
index 5599f9d4..2a32705d 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Consumer.scala
@@ -20,14 +20,14 @@ import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
/**
- * Akka Stream connector for subscribing to Kafka topics.
+ * Apache Pekko Stream connector for subscribing to Kafka topics.
*/
object Consumer {
/**
* Materialized value of the consumer `Source`.
*
- * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+ * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
*/
trait Control {
@@ -37,7 +37,7 @@ object Consumer {
* already enqueued messages. It does not unsubscribe from any topics/partitions
* as that could trigger a consumer group rebalance.
*
- * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+ * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
*
* Call [[#shutdown]] to close consumer.
*/
@@ -46,11 +46,11 @@ object Consumer {
/**
* Shut down the consumer `Source`.
*
- * The actor backing the source will stay alive for `akka.kafka.consumer.stop-timeout` so that more commits
+ * The actor backing the source will stay alive for `pekko.kafka.consumer.stop-timeout` so that more commits
* from enqueued messages can be handled.
* The actor will wait for acknowledgements of the already sent offset commits from the Kafka broker before shutting down.
*
- * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+ * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
*/
def shutdown(): Future[Done]
@@ -94,7 +94,7 @@ object Consumer {
* one, so that the stream can be stopped in a controlled way without losing
* commits.
*
- * See [[https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
+ * See [[https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#controlled-shutdown Controlled shutdown]]
*/
final class DrainingControl[T] private (control: Control, val streamCompletion: Future[T]) extends Control {
@@ -199,7 +199,7 @@ object Consumer {
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
- * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html),
+ * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
* [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
*/
@ApiMayChange
@@ -219,7 +219,7 @@ object Consumer {
* This is useful when "at-least once delivery" is desired, as each message will likely be
* delivered one time but in failure cases could be duplicated.
*
- * It is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html),
+ * It is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html),
* [[Producer.flowWithContext]] and/or [[Committer.sinkWithOffsetContext]].
*
* This variant makes it possible to add additional metadata (in the form of a string)
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
index d98030b7..fe027e2a 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/DiscoverySupport.scala
@@ -26,7 +26,7 @@ object DiscoverySupport {
// used for initial discovery of contact points
private def discovery(config: Config, system: ActorSystem): ServiceDiscovery =
config.getString("discovery-method") match {
- case "akka.discovery" =>
+ case "pekko.discovery" =>
Discovery(system).discovery
case otherDiscoveryMechanism =>
@@ -34,7 +34,7 @@ object DiscoverySupport {
}
/**
- * Use Akka Discovery to read the addresses for `serviceName` within `lookupTimeout`.
+ * Use Apache Pekko Discovery to read the addresses for `serviceName` within `lookupTimeout`.
*/
private def bootstrapServers(
discovery: ServiceDiscovery,
@@ -55,7 +55,7 @@ object DiscoverySupport {
/**
* Internal API.
*
- * Expect a `service` section in Config and use Akka Discovery to read the addresses for `name` within `lookup-timeout`.
+ * Expect a `service` section in Config and use Apache Pekko Discovery to read the addresses for `name` within `lookup-timeout`.
*/
@InternalApi
private[kafka] def bootstrapServers(config: Config)(implicit system: ActorSystem): Future[String] = {
@@ -117,7 +117,7 @@ object DiscoverySupport {
system.dynamicAccess.getClassFor("org.apache.pekko.discovery.Discovery$") match {
case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
throw new IllegalStateException(
- s"Pekko Discovery is being used but the `pekko-discovery` library is not on the classpath, it must be added explicitly. See https://pekko.apache.org/docs/pekko/current/discovery/index.html")
+ s"Apache Pekko Discovery is being used but the `pekko-discovery` library is not on the classpath, it must be added explicitly. See https://pekko.apache.org/docs/pekko/current/discovery/index.html")
case _ =>
}
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala
index 3a6f2fda..a5161bea 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Producer.scala
@@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
import scala.concurrent.Future
/**
- * Akka Stream connector for publishing messages to Kafka topics.
+ * Apache Pekko Stream connector for publishing messages to Kafka topics.
*/
object Producer {
@@ -203,7 +203,7 @@ object Producer {
*
* - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
*
- * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+ * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
*
* @tparam C the flow context type
*/
@@ -272,7 +272,7 @@ object Producer {
*
* - [[org.apache.pekko.kafka.ProducerMessage.PassThroughMessage PassThroughMessage]] does not publish anything, and continues in the stream as [[org.apache.pekko.kafka.ProducerMessage.PassThroughResult PassThroughResult]]
*
- * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html).
+ * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html).
*
* Supports sharing a Kafka Producer instance.
*
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
index 58cd7aba..48948507 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/SendProducer.scala
@@ -15,7 +15,7 @@ import org.apache.kafka.clients.producer.{ Callback, ProducerRecord, RecordMetad
import scala.concurrent.{ ExecutionContext, Future, Promise }
/**
- * Utility class for producing to Kafka without using Akka Streams.
+ * Utility class for producing to Kafka without using Apache Pekko Streams.
* @param settings producer settings used to create or access the [[org.apache.kafka.clients.producer.Producer]]
*/
final class SendProducer[K, V] private (val settings: ProducerSettings[K, V], system: ActorSystem) {
diff --git a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
index 526d519f..af864ee2 100644
--- a/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
+++ b/core/src/main/scala/org/apache/pekko/kafka/scaladsl/Transactional.scala
@@ -25,7 +25,7 @@ import org.apache.kafka.common.TopicPartition
import scala.concurrent.Future
/**
- * Akka Stream connector to support transactions between Kafka topics.
+ * Apache Pekko Stream connector to support transactions between Kafka topics.
*/
object Transactional {
@@ -40,7 +40,7 @@ object Transactional {
/**
* API MAY CHANGE
*
- * This source is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+ * This source is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Transactional.flowWithOffsetContext]].
*/
@ApiMayChange
@@ -122,7 +122,7 @@ object Transactional {
* carries [[ConsumerMessage.PartitionOffset]] as context. The flow requires a unique `transactional.id` across all app
* instances. The flow will override producer properties to enable Kafka exactly-once transactional support.
*
- * This flow is intended to be used with Akka's [flow with context](https://doc.akka.io/docs/akka/current/stream/operators/Flow/asFlowWithContext.html)
+ * This flow is intended to be used with Apache Pekko's [flow with context](https://pekko.apache.org/docs/pekko/current/stream/operators/Flow/asFlowWithContext.html)
* and [[Transactional.sourceWithOffsetContext]].
*/
@ApiMayChange
diff --git a/project/ProjectSettings.scala b/project/ProjectSettings.scala
index aecf6f47..20a08996 100644
--- a/project/ProjectSettings.scala
+++ b/project/ProjectSettings.scala
@@ -15,7 +15,7 @@ object ProjectSettings {
|
|The build has three main modules:
| core - the Kafka connector sources
- | cluster-sharding - Akka Cluster External Sharding with the Apache Pekko Kafka Connector
+ | cluster-sharding - Apache Pekko Cluster External Sharding with the Apache Pekko Kafka Connector
| tests - tests, Docker based integration tests, code for the documentation
| testkit - framework for testing the connector
|
@@ -64,7 +64,7 @@ object ProjectSettings {
url("https://github.com/apache/incubator-pekko-connectors-kafka/graphs/contributors")),
startYear := Some(2022),
licenses := Seq("Apache-2.0" -> url("https://opensource.org/licenses/Apache-2.0")),
- description := "Apache Pekko Kafka Connector is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Pekko.",
+ description := "Apache Pekko Kafka Connector is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Apache Pekko.",
crossScalaVersions := Seq(Scala213),
scalaVersion := Scala213,
crossVersion := CrossVersion.binary,
@@ -87,7 +87,7 @@ object ProjectSettings {
"-sourcepath",
(ThisBuild / baseDirectory).value.toString,
"-skip-packages",
- "akka.pattern:scala", // for some reason Scaladoc creates this
+ "pekko.pattern:scala", // for some reason Scaladoc creates this
"-doc-source-url", {
val branch = if (isSnapshot.value) "master" else s"v${version.value}"
s"https://github.com/apache/incubator-pekko-connectors-kafka/tree/${branch}€{FILE_PATH_EXT}#L€{FILE_LINE}"
diff --git a/project/VersionGenerator.scala b/project/VersionGenerator.scala
index c8abf39e..104302a2 100644
--- a/project/VersionGenerator.scala
+++ b/project/VersionGenerator.scala
@@ -2,19 +2,19 @@ import sbt._
import sbt.Keys._
/**
- * Generate version.conf and akka/kafka/Version.scala files based on the version setting.
+ * Generate version.conf and org/apache/pekko/kafka/Version.scala files based on the version setting.
*
- * This was adapted from https://github.com/akka/akka/blob/v2.6.8/project/VersionGenerator.scala
+ * This was adapted from https://github.com/apache/incubator-pekko/blob/main/project/VersionGenerator.scala
*/
object VersionGenerator {
val settings: Seq[Setting[_]] = inConfig(Compile)(
Seq(
- resourceGenerators += generateVersion(resourceManaged, _ / "version.conf", """|akka.kafka.version = "%s"
+ resourceGenerators += generateVersion(resourceManaged, _ / "version.conf", """|pekko.kafka.version = "%s"
|"""),
sourceGenerators += generateVersion(
sourceManaged,
- _ / "akka" / "kafka" / "Version.scala",
+ _ / "org" / "apache" / "pekko" / "kafka" / "Version.scala",
"""|package org.apache.pekko.kafka
|
|object Version {
diff --git a/project/Versions.scala b/project/Versions.scala
index ba78eccb..c380ed4d 100644
--- a/project/Versions.scala
+++ b/project/Versions.scala
@@ -16,7 +16,7 @@ object Versions {
// Keep .scala-steward.conf pin in sync
val kafkaVersion = "3.0.1"
val KafkaVersionForDocs = "30"
- // This should align with the ScalaTest version used in the Akka 2.6.x testkit
+ // This should align with the ScalaTest version used in the Apache Pekko 1.0.x testkit
// https://github.com/apache/incubator-pekko/blob/main/project/Dependencies.scala#L70
val scalaTestVersion = "3.1.4"
val testcontainersVersion = "1.16.3"
diff --git a/project/project-info.conf b/project/project-info.conf
index 5b14426b..5ef8ad9a 100644
--- a/project/project-info.conf
+++ b/project/project-info.conf
@@ -12,7 +12,7 @@ project-info {
text: "Github issues"
}
release-notes: {
- url: "https://doc.akka.io/docs/alpakka-kafka/current/release-notes/index.html"
+ url: "https://pekko.apache.org/docs/pekko-connectors-kafka/current/release-notes/index.html"
text: "In the documentation"
new-tab: false
}
diff --git a/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java b/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java
index 2763a122..0867b297 100644
--- a/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java
+++ b/testkit/src/main/java/org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.java
@@ -43,7 +43,7 @@ public class KafkaContainerCluster implements Startable {
public static final Duration DEFAULT_CLUSTER_START_TIMEOUT = Duration.ofSeconds(360);
public static final Duration DEFAULT_READINESS_CHECK_TIMEOUT = DEFAULT_CLUSTER_START_TIMEOUT;
- private static final String LOGGING_NAMESPACE_PREFIX = "akka.kafka.testkit.testcontainers.logs";
+ private static final String LOGGING_NAMESPACE_PREFIX = "pekko.kafka.testkit.testcontainers.logs";
private static final String READINESS_CHECK_SCRIPT = "/testcontainers_readiness_check.sh";
private static final String READINESS_CHECK_TOPIC = "ready-kafka-container-cluster";
private static final Version BOOTSTRAP_PARAM_MIN_VERSION = new Version("5.2.0");
diff --git a/testkit/src/main/resources/reference.conf b/testkit/src/main/resources/reference.conf
index 5142a855..988945e2 100644
--- a/testkit/src/main/resources/reference.conf
+++ b/testkit/src/main/resources/reference.conf
@@ -1,5 +1,5 @@
# // #testkit-settings
-akka.kafka.testkit {
+pekko.kafka.testkit {
# amount of time to wait until the desired cluster state is reached
cluster-timeout = 10 seconds
@@ -13,18 +13,18 @@ akka.kafka.testkit {
# // #testkit-settings
# // #testkit-testcontainers-settings
-akka.kafka.testkit.testcontainers {
+pekko.kafka.testkit.testcontainers {
# define these settings to select a different Kafka/ZooKeeper docker image
# we recommend using Confluent Platform docker images and using the same version across all images
# Confluent publishes images on DockerHub: https://hub.docker.com/r/confluentinc/cp-kafka/tags
# Kafka versions in Confluent Platform: https://docs.confluent.io/current/installation/versions-interoperability.html
zookeeper-image = "confluentinc/cp-zookeeper"
- zookeeper-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
+ zookeeper-image-tag = ${pekko.kafka.testkit.testcontainers.confluent-platform-version}
kafka-image = "confluentinc/cp-kafka"
- kafka-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
+ kafka-image-tag = ${pekko.kafka.testkit.testcontainers.confluent-platform-version}
schema-registry-image = "confluentinc/cp-schema-registry"
- schema-registry-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
+ schema-registry-image-tag = ${pekko.kafka.testkit.testcontainers.confluent-platform-version}
# See https://docs.confluent.io/platform/current/installation/versions-interoperability.html
confluent-platform-version = "7.0.0"
diff --git a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala
index bdf541e1..e10d6b7e 100644
--- a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala
+++ b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitSettings.scala
@@ -31,7 +31,7 @@ class KafkaTestkitSettings private (val clusterTimeout: FiniteDuration,
}
object KafkaTestkitSettings {
- final val ConfigPath = "akka.kafka.testkit"
+ final val ConfigPath = "pekko.kafka.testkit"
/**
* Create testkit settings from ActorSystem settings.
diff --git a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
index f9394f97..a7786dd9 100644
--- a/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
+++ b/testkit/src/main/scala/org/apache/pekko/kafka/testkit/KafkaTestkitTestcontainersSettings.scala
@@ -283,7 +283,7 @@ final class KafkaTestkitTestcontainersSettings private (
}
object KafkaTestkitTestcontainersSettings {
- final val ConfigPath = "akka.kafka.testkit.testcontainers"
+ final val ConfigPath = "pekko.kafka.testkit.testcontainers"
/**
* Create testkit testcontainers settings from ActorSystem settings.
diff --git a/tests/src/it/resources/reference.conf b/tests/src/it/resources/reference.conf
index 13c73da6..da2d68f5 100644
--- a/tests/src/it/resources/reference.conf
+++ b/tests/src/it/resources/reference.conf
@@ -1,7 +1,7 @@
-akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
+pekko {
+ loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
- logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+ logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter"
logger-startup-timeout = 15s
actor {
@@ -14,7 +14,7 @@ akka {
test {
# https://github.com/akka/alpakka-kafka/pull/994
timefactor = 3.0
- timefactor = ${?AKKA_TEST_TIMEFACTOR}
+ timefactor = ${?PEKKO_TEST_TIMEFACTOR}
single-expect-default = 10s
}
diff --git a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
index b2f07c30..2a7ad93f 100644
--- a/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
+++ b/tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
@@ -86,7 +86,7 @@ class ConsumerExampleTest extends TestcontainersKafkaTest {
}
// #settings
- final Config config = system.settings().config().getConfig("akka.kafka.consumer");
+ final Config config = system.settings().config().getConfig("pekko.kafka.consumer");
final ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(config, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers("localhost:9092")
diff --git a/tests/src/test/java/docs/javadsl/ProducerTest.java b/tests/src/test/java/docs/javadsl/ProducerTest.java
index 1eada873..ae3094ad 100644
--- a/tests/src/test/java/docs/javadsl/ProducerTest.java
+++ b/tests/src/test/java/docs/javadsl/ProducerTest.java
@@ -68,7 +68,7 @@ class ProducerTest extends TestcontainersKafkaTest {
void createProducer() {
// #producer
// #settings
- final Config config = system.settings().config().getConfig("akka.kafka.producer");
+ final Config config = system.settings().config().getConfig("pekko.kafka.producer");
final ProducerSettings<String, String> producerSettings =
ProducerSettings.create(config, new StringSerializer(), new StringSerializer())
.withBootstrapServers("localhost:9092");
diff --git a/tests/src/test/resources/application.conf b/tests/src/test/resources/application.conf
index 7bfceefc..55132734 100644
--- a/tests/src/test/resources/application.conf
+++ b/tests/src/test/resources/application.conf
@@ -11,12 +11,12 @@ pekko {
test {
# https://github.com/akka/alpakka-kafka/pull/994
timefactor = 3.0
- timefactor = ${?AKKA_TEST_TIMEFACTOR}
+ timefactor = ${?PEKKO_TEST_TIMEFACTOR}
single-expect-default = 10s
}
}
-akka {
+pekko {
kafka {
consumer {
stop-timeout = 10ms
@@ -30,10 +30,10 @@ akka {
}
# default is 10 seconds
-# akka.kafka.testkit.consumer-group-timeout = 20 seconds
+# pekko.kafka.testkit.consumer-group-timeout = 20 seconds
# #consumer-config-inheritance
-our-kafka-consumer: ${akka.kafka.consumer} {
+our-kafka-consumer: ${pekko.kafka.consumer} {
kafka-clients {
bootstrap.servers = "kafka-host:9092"
}
diff --git a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
index 1a0ed5db..46b01f5c 100644
--- a/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ClusterShardingExample.scala
@@ -53,7 +53,7 @@ object ClusterShardingExample {
// #message-extractor
// #setup-cluster-sharding
- // create an Akka Cluster Sharding `EntityTypeKey` for `User` for this Kafka Consumer Group
+ // create an Apache Pekko Cluster Sharding `EntityTypeKey` for `User` for this Kafka Consumer Group
val groupId = "user-topic-group-id"
val typeKey = EntityTypeKey[User](groupId)
@@ -69,11 +69,11 @@ object ClusterShardingExample {
// #setup-cluster-sharding
// #rebalance-listener
- // obtain an Akka classic ActorRef that will handle consumer group rebalance events
+ // obtain an Apache Pekko classic ActorRef that will handle consumer group rebalance events
val rebalanceListener: org.apache.pekko.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system.toClassic).rebalanceListener(typeKey)
- // convert the rebalance listener to a classic ActorRef until Alpakka Kafka supports Akka Typed
+ // convert the rebalance listener to a classic ActorRef until Apache Pekko Connector Kafka supports Apache Pekko Typed
import org.apache.pekko.actor.typed.scaladsl.adapter._
val rebalanceListenerClassic: org.apache.pekko.actor.ActorRef = rebalanceListener.toClassic
diff --git a/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala b/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
index a22abd85..1c816eb8 100644
--- a/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ConsumerExample.scala
@@ -84,7 +84,7 @@ class ConsumerExample extends DocsSpecBase with TestcontainersKafkaLike {
def createSettings(): ConsumerSettings[String, Array[Byte]] = {
// #settings
- val config = system.settings.config.getConfig("akka.kafka.consumer")
+ val config = system.settings.config.getConfig("pekko.kafka.consumer")
val consumerSettings =
ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(bootstrapServers)
diff --git a/tests/src/test/scala/docs/scaladsl/ProducerExample.scala b/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
index 5899fbbc..3bfb5f38 100644
--- a/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
+++ b/tests/src/test/scala/docs/scaladsl/ProducerExample.scala
@@ -27,7 +27,7 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
"Creating a producer" should "work" in {
// #producer
// #settings
- val config = system.settings.config.getConfig("akka.kafka.producer")
+ val config = system.settings.config.getConfig("pekko.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
@@ -42,7 +42,7 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
}
"PlainSink" should "work" in assertAllStagesStopped {
- val config = system.settings.config.getConfig("akka.kafka.producer")
+ val config = system.settings.config.getConfig("pekko.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
@@ -95,7 +95,7 @@ class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike {
}
"Metrics" should "be observed" in assertAllStagesStopped {
- val config = system.settings.config.getConfig("akka.kafka.producer")
+ val config = system.settings.config.getConfig("pekko.kafka.producer")
val producerSettings =
ProducerSettings(config, new StringSerializer, new StringSerializer)
.withBootstrapServers(bootstrapServers)
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
index f0f08c7c..4e616dc4 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/ConsumerSettingsSpec.scala
@@ -30,11 +30,11 @@ class ConsumerSettingsSpec
"handle nested kafka-clients properties" in {
val conf = ConfigFactory.parseString("""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.bootstrap.foo = baz
- akka.kafka.consumer.kafka-clients.foo = bar
- akka.kafka.consumer.kafka-clients.client.id = client1
- """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.bootstrap.foo = baz
+ pekko.kafka.consumer.kafka-clients.foo = bar
+ pekko.kafka.consumer.kafka-clients.client.id = client1
+ """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new StringDeserializer)
settings.getProperty("bootstrap.servers") should ===("localhost:9092")
settings.getProperty("client.id") should ===("client1")
@@ -46,22 +46,22 @@ class ConsumerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.client.id = client1
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.consumer")
+ .getConfig("pekko.kafka.consumer")
val settings = ConsumerSettings(conf, None, None)
settings.getProperty("bootstrap.servers") should ===("localhost:9092")
}
"handle deserializers passed as args config" in {
val conf = ConfigFactory.parseString("""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.parallelism = 1
- """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.parallelism = 1
+ """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new StringDeserializer)
settings.getProperty("bootstrap.servers") should ===("localhost:9092")
}
@@ -70,12 +70,12 @@ class ConsumerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.client.id = client1
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.consumer")
+ .getConfig("pekko.kafka.consumer")
val settings = ConsumerSettings(conf, Some(new ByteArrayDeserializer), None)
settings.getProperty("bootstrap.servers") should ===("localhost:9092")
}
@@ -84,18 +84,18 @@ class ConsumerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.client.id = client1
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.consumer")
+ .getConfig("pekko.kafka.consumer")
val settings = ConsumerSettings(conf, None, Some(new ByteArrayDeserializer))
settings.getProperty("bootstrap.servers") should ===("localhost:9092")
}
"filter passwords from kafka-clients properties" in {
- val conf = ConfigFactory.load().getConfig("akka.kafka.consumer")
+ val conf = ConfigFactory.load().getConfig("pekko.kafka.consumer")
val settings = ConsumerSettings(conf, new ByteArrayDeserializer, new StringDeserializer)
.withProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "hemligt")
.withProperty("ssl.truststore.password", "geheim")
@@ -110,12 +110,12 @@ class ConsumerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.client.id = client1
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.consumer")
+ .getConfig("pekko.kafka.consumer")
val exception = intercept[IllegalArgumentException] {
ConsumerSettings(conf, None, None)
}
@@ -125,9 +125,9 @@ class ConsumerSettingsSpec
"throw IllegalArgumentException if no value deserializer defined (null case). Key serializer passed as args config" in {
val conf = ConfigFactory.parseString("""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.client.id = client1
- """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.client.id = client1
+ """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
val exception = intercept[IllegalArgumentException] {
ConsumerSettings(conf, new ByteArrayDeserializer, null)
}
@@ -139,12 +139,12 @@ class ConsumerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.client.id = client1
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.consumer")
+ .getConfig("pekko.kafka.consumer")
val exception = intercept[IllegalArgumentException] {
ConsumerSettings(conf, None, null)
}
@@ -156,12 +156,12 @@ class ConsumerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.client.id = client1
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.consumer")
+ .getConfig("pekko.kafka.consumer")
val exception = intercept[IllegalArgumentException] {
ConsumerSettings(conf, None, None)
}
@@ -171,9 +171,9 @@ class ConsumerSettingsSpec
"throw IllegalArgumentException if no key deserializer defined (null case). Value serializer passed as args config" in {
val conf = ConfigFactory.parseString("""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.client.id = client1
- """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.consumer")
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.client.id = client1
+ """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.consumer")
val exception = intercept[IllegalArgumentException] {
ConsumerSettings(conf, null, new ByteArrayDeserializer)
}
@@ -185,12 +185,12 @@ class ConsumerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
- akka.kafka.consumer.kafka-clients.client.id = client1
+ pekko.kafka.consumer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.consumer.kafka-clients.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
+ pekko.kafka.consumer.kafka-clients.client.id = client1
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.consumer")
+ .getConfig("pekko.kafka.consumer")
val exception = intercept[IllegalArgumentException] {
ConsumerSettings(conf, null, None)
}
@@ -244,7 +244,7 @@ object ConsumerSettingsSpec {
val DiscoveryConfigSection =
s"""
// #discovery-service
- discovery-consumer: $${akka.kafka.consumer} {
+ discovery-consumer: $${pekko.kafka.consumer} {
service-name = "kafkaService1"
resolve-timeout = 10 ms
}
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala
index 9da53069..31f85eec 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/ProducerSettingsSpec.scala
@@ -30,22 +30,22 @@ class ProducerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
- akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.producer")
+ .getConfig("pekko.kafka.producer")
val settings = ProducerSettings(conf, None, None)
settings.properties("bootstrap.servers") should ===("localhost:9092")
}
"handle serializers passed as args config" in {
val conf = ConfigFactory.parseString("""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.producer")
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.producer")
val settings = ProducerSettings(conf, new ByteArraySerializer, new StringSerializer)
settings.properties("bootstrap.servers") should ===("localhost:9092")
}
@@ -54,12 +54,12 @@ class ProducerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.producer")
+ .getConfig("pekko.kafka.producer")
val settings = ProducerSettings(conf, Some(new ByteArraySerializer), None)
settings.properties("bootstrap.servers") should ===("localhost:9092")
}
@@ -68,12 +68,12 @@ class ProducerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.producer")
+ .getConfig("pekko.kafka.producer")
val settings = ProducerSettings(conf, None, Some(new ByteArraySerializer))
settings.properties("bootstrap.servers") should ===("localhost:9092")
}
@@ -94,12 +94,12 @@ class ProducerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.producer")
+ .getConfig("pekko.kafka.producer")
val exception = intercept[IllegalArgumentException] {
ProducerSettings(conf, None, None)
}
@@ -109,9 +109,9 @@ class ProducerSettingsSpec
"throw IllegalArgumentException if no value serializer defined (null case). Key serializer passed as args config" in {
val conf = ConfigFactory.parseString("""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.producer")
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.producer")
val exception = intercept[IllegalArgumentException] {
ProducerSettings(conf, new ByteArraySerializer, null)
}
@@ -123,12 +123,12 @@ class ProducerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- akka.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ pekko.kafka.producer.kafka-clients.key.serializer = org.apache.kafka.common.serialization.StringSerializer
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.producer")
+ .getConfig("pekko.kafka.producer")
val exception = intercept[IllegalArgumentException] {
ProducerSettings(conf, None, null)
}
@@ -140,12 +140,12 @@ class ProducerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.producer")
+ .getConfig("pekko.kafka.producer")
val exception = intercept[IllegalArgumentException] {
ProducerSettings(conf, None, None)
}
@@ -155,9 +155,9 @@ class ProducerSettingsSpec
"throw IllegalArgumentException if no key serializer defined (null case). Value serializer passed as args config" in {
val conf = ConfigFactory.parseString("""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- """).withFallback(ConfigFactory.load()).getConfig("akka.kafka.producer")
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ """).withFallback(ConfigFactory.load()).getConfig("pekko.kafka.producer")
val exception = intercept[IllegalArgumentException] {
ProducerSettings(conf, null, new ByteArraySerializer)
}
@@ -169,12 +169,12 @@ class ProducerSettingsSpec
val conf = ConfigFactory
.parseString(
"""
- akka.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
- akka.kafka.producer.kafka-clients.parallelism = 1
- akka.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
+ pekko.kafka.producer.kafka-clients.bootstrap.servers = "localhost:9092"
+ pekko.kafka.producer.kafka-clients.parallelism = 1
+ pekko.kafka.producer.kafka-clients.value.serializer = org.apache.kafka.common.serialization.StringSerializer
""")
.withFallback(ConfigFactory.load())
- .getConfig("akka.kafka.producer")
+ .getConfig("pekko.kafka.producer")
val exception = intercept[IllegalArgumentException] {
ProducerSettings(conf, null, None)
}
@@ -231,7 +231,7 @@ object ProducerSettingsSpec {
val DiscoveryConfigSection =
s"""
// #discovery-service
- discovery-producer: $${akka.kafka.producer} {
+ discovery-producer: $${pekko.kafka.producer} {
service-name = "kafkaService1"
resolve-timeout = 10 ms
}
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
index 099c28c6..ebbd2ab5 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/CommittingWithMockSpec.scala
@@ -68,7 +68,7 @@ class CommittingWithMockSpec(_system: ActorSystem)
this(
ActorSystem("CommittingWithMockSpec",
ConfigFactory
- .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+ .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
.withFallback(ConfigFactory.load())))
override def afterAll(): Unit =
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
index 37f66b39..40cdc909 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ConsumerSpec.scala
@@ -64,7 +64,7 @@ class ConsumerSpec(_system: ActorSystem)
this(
ActorSystem("ConsumerSpec",
ConfigFactory
- .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+ .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
.withFallback(ConfigFactory.load())))
override def afterAll(): Unit =
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
index 1e1537b4..1c211cc6 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/PartitionedSourceSpec.scala
@@ -50,7 +50,7 @@ class PartitionedSourceSpec(_system: ActorSystem)
this(
ActorSystem("PartitionedSourceSpec",
ConfigFactory
- .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+ .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
.withFallback(ConfigFactory.load())))
override def afterAll(): Unit =
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
index 6af86887..b3e8a8f1 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/internal/ProducerSpec.scala
@@ -49,7 +49,7 @@ class ProducerSpec(_system: ActorSystem)
this(
ActorSystem("ProducerSpec",
ConfigFactory
- .parseString("""akka.stream.materializer.debug.fuzzing-mode = on""")
+ .parseString("""pekko.stream.materializer.debug.fuzzing-mode = on""")
.withFallback(ConfigFactory.load())))
override def afterAll(): Unit = shutdown(system)
@@ -200,7 +200,7 @@ class ProducerSpec(_system: ActorSystem)
source.sendError(sourceError)
// Here we can not be sure that all messages from source delivered to producer
- // because of buffers in akka-stream and faster error pushing that ignores buffers
+ // because of buffers in pekko-stream and faster error pushing that ignores buffers
sink.expectError(sourceError)
client.verifyClosed()
@@ -546,7 +546,7 @@ class ProducerSpec(_system: ActorSystem)
source.sendError(new Exception())
// Here we can not be sure that all messages from source delivered to producer
- // because of buffers in akka-stream and faster error pushing that ignores buffers
+ // because of buffers in pekko-stream and faster error pushing that ignores buffers
// TODO: we can await a tx to be initialized before sending the error (which means the producer was assigned and first msg processed). does that invalidate this test?
Await.ready(sink, remainingOrDefault)
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
index 671adfaf..029b9fc8 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/IntegrationSpec.scala
@@ -235,7 +235,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside
Await.result(produce(topic, 1 to 100), remainingOrDefault)
// Create ConsumerActor manually
- // https://doc.akka.io/docs/akka-stream-kafka/0.20/consumer.html#sharing-kafkaconsumer
+ // https://pekko.apache.org/docs/pekko-connectors-kafka/current/consumer.html#sharing-kafkaconsumer
val consumer = system.actorOf(KafkaConsumerActor.props(consumerDefaults.withGroupId(group)))
// Timeout for metadata fetching requests
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
index b1876576..4f31a761 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/scaladsl/RebalanceSpec.scala
@@ -239,13 +239,13 @@ object PekkoConnectorsAssignor {
* client id so that we can filter them during assignment. The member id is a concatenation of the client id and the
* group member instance id that's generated by the Consumer Group coordinator.
*
- * Pass a client.id -> Set[TopicPartition] map to `AlpakkaAssignor.clientIdToPartitionMap` **before** you anticipate a
+ * Pass a client.id -> Set[TopicPartition] map to `PekkoConnectorsAssignor.clientIdToPartitionMap` **before** you anticipate a
* rebalance to occur in your test.
*/
class PekkoConnectorsAssignor extends AbstractPartitionAssignor {
val log: Logger = LoggerFactory.getLogger(getClass)
- override def name(): String = "alpakka-test"
+ override def name(): String = "pekko-connector-kafka-test"
override def assign(
partitionsPerTopic: util.Map[String, Integer],
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
index 93156a79..ef44b47b 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/CapturingAppender.scala
@@ -10,7 +10,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.AppenderBase
/**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
*
* INTERNAL API
*/
@@ -34,13 +34,13 @@ import ch.qos.logback.core.AppenderBase
}
/**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
*
* INTERNAL API
*
* Logging from tests can be silenced by this appender. When there is a test failure
* the captured logging events are flushed to the appenders defined for the
- * org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
+ * org.apache.org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
*
* The flushing on test failure is handled by [[org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing]]
* for ScalaTest and [[org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing]] for JUnit.
@@ -48,9 +48,9 @@ import ch.qos.logback.core.AppenderBase
* Use configuration like the following the logback-test.xml:
*
* {{{
- * <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
+ * <appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
*
- * <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
+ * <logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
* <appender-ref ref="STDOUT"/>
* </logger>
*
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala
index 00d24454..7009c22c 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/LogbackUtil.scala
@@ -10,7 +10,7 @@ import ch.qos.logback.classic.Level
import org.slf4j.LoggerFactory
/**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
*
* INTERNAL API
*/
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala
index 50efff7a..25cfa7f3 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/javadsl/LogCapturingJunit4.scala
@@ -14,7 +14,7 @@ import org.junit.runners.model.Statement
import org.slf4j.LoggerFactory
/**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
*
* JUnit `TestRule` to make log lines appear only when the test failed.
*
@@ -26,9 +26,9 @@ import org.slf4j.LoggerFactory
* Requires Logback and configuration like the following the logback-test.xml:
*
* {{{
- * <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
+ * <appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
*
- * <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
+ * <logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
* <appender-ref ref="STDOUT"/>
* </logger>
*
diff --git a/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala b/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala
index 701308cc..3236b825 100644
--- a/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala
+++ b/tests/src/test/scala/org/apache/pekko/kafka/tests/scaladsl/LogCapturing.scala
@@ -15,16 +15,16 @@ import org.scalatest.TestSuite
import org.slf4j.LoggerFactory
/**
- * See https://doc.akka.io/docs/akka/current/typed/testing-async.html#silence-logging-output-from-tests
+ * See https://pekko.apache.org/docs/pekko/current/typed/testing-async.html#silence-logging-output-from-tests
*
* Mixin this trait to a ScalaTest test to make log lines appear only when the test failed.
*
* Requires Logback and configuration like the following the logback-test.xml:
*
* {{{
- * <appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
+ * <appender name="CapturingAppender" class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender" />
*
- * <logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
+ * <logger name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate" >
* <appender-ref ref="STDOUT"/>
* </logger>
*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org