You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2023/03/17 19:50:24 UTC

[incubator-pekko-samples] branch main updated: Issue 243: Convert pekko-samples `cluster-client` java/scala to pekko (#4)

This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git


The following commit(s) were added to refs/heads/main by this push:
     new b285aa1  Issue 243: Convert pekko-samples `cluster-client` java/scala to pekko (#4)
b285aa1 is described below

commit b285aa12650740b90595a1db405c33b0a467c09b
Author: Sam Byng <43...@users.noreply.github.com>
AuthorDate: Fri Mar 17 19:50:18 2023 +0000

    Issue 243: Convert pekko-samples `cluster-client` java/scala to pekko (#4)
    
    * sambyng/gh-243: Change cluster-client-grpc dirs to have pekko prefix
    
    * Convert to pekko-sample-cluster-client-gprc-java
    
    update pom.xml:
    
    Update versions to pekko version
    Update jar dependencies with com.typesafe.akka to pekko groupID and artifactID. NO change to lightbend groupID items yet
    
    GH-243: Update pom.xml based on assumption that all com.lightbend.akka.<package> group IDs publish to org.apache.pekko
    
    I checked the lightbend.akka.grpc project and those packages are going to org.apache.pekko so assuming this is the case for all
    
    Update pekko-sample-cluster-client-grpc-java source files to import akka
    
    * Convert pekko-sample-cluster-client-grpc-scala
    
    update build.sbt
    
    Update plugins list for  pekko-sample-cluster-client-grpc-scala
    
    `- note sbt-pekko-grpc is the correct naming convention since looking at the grpc project build we have the plugin with name =
    sbt-"$pekkoPrefix" [where pekkoPrefix=pekko-grpc]
    
    Update application.conf to point at pekko
    
    Update  pekko-sample-cluster-client-grpc-scala source code files to point at pekko
    
    * Update documentation: Update URLs to point to pekko.apache.org/docs/pekko/current ; Update READMEs to refer to Pekko ;
    
    NOTE: No change to lightbend URLs since need to find equivalent docs
    
    * In source code, Configs use pekko prefix instead of akka
    
    * Update build-test.yml to point at new dirnames
    
    * #243: Update build.sbt files to have a resolver to find the Pekko snapshots in the Apache repository
    
    * Update docs Pekko and Cassandra references to have Apache prefix
    
    * Add resolvers to pom.xml / build.sbt files for cluster-client
    
    * Update Artery remoting ports 25251/25252 to 17356/17357
    
    * Issue 243: cluster-client-grpc-java update pom.xml to use the grpc-maven-plugin, pulling from pluginRepository
    
    * typo
    
    * fix one of the 2 compile issues
    
    * compilation fix
    
    ---------
    
    Co-authored-by: sb5 <sb...@ad.datcon.co.uk>
    Co-authored-by: PJ Fanning <pj...@users.noreply.github.com>
    Co-authored-by: Seetaramayya <35...@users.noreply.github.com>
---
 .github/workflows/build-test.yml                   |   4 +-
 akka-sample-cluster-client-grpc-java/pom.xml       |  99 ---------------
 .../client/grpc/ClusterClientReceptionist.java     | 131 -------------------
 .../LICENSE                                        |   0
 .../README.md                                      |   4 +-
 pekko-sample-cluster-client-grpc-java/pom.xml      | 121 ++++++++++++++++++
 .../cluster/client/grpc/CborSerializable.java      |   0
 .../sample/cluster/client/grpc/ClusterClient.java  |  36 +++---
 .../client/grpc/ClusterClientReceptionist.java     | 138 +++++++++++++++++++++
 .../grpc/ClusterClientReceptionistGrpcImpl.java    |  16 +--
 .../client/grpc/ClusterClientSerialization.java    |  10 +-
 .../cluster/client/grpc/ClusterClientSettings.java |   4 +-
 .../client/grpc/ClusterReceptionistSettings.java   |   2 +-
 .../src/main/protobuf/clusterclient.proto          |   0
 .../src/main/resources/application.conf            |   4 +-
 .../cluster/client/grpc/ClusterClientTest.java     |  28 ++---
 16 files changed, 313 insertions(+), 284 deletions(-)

diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 8e2e90b..e287eae 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -31,8 +31,8 @@ jobs:
       - name: Test pekko-sample-cluster-client-grpc-scala
         run: cd pekko-sample-cluster-client-grpc-scala && sbt multi-jvm:test
 
-      - name: Test akka-sample-cluster-client-grpc-java
-        run: cd akka-sample-cluster-client-grpc-java && mvn test
+      - name: Test pekko-sample-cluster-client-grpc-java
+        run: cd pekko-sample-cluster-client-grpc-java && mvn test
 
       - name: Test pekko-sample-distributed-data-java
         run: cd pekko-sample-distributed-data-java && sbt multi-jvm:test
diff --git a/akka-sample-cluster-client-grpc-java/pom.xml b/akka-sample-cluster-client-grpc-java/pom.xml
deleted file mode 100644
index 62579cd..0000000
--- a/akka-sample-cluster-client-grpc-java/pom.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
-                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <akka.version>2.6.20</akka.version>
-    <akka.grpc.version>0.8.4</akka.grpc.version>
-  </properties>
-
-  <groupId>com.lightbend.akka.samples</groupId>
-  <artifactId>akka-sample-cluster-client-grpc-java</artifactId>
-  <packaging>jar</packaging>
-  <version>empty</version>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-stream_2.13</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-cluster_2.13</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-cluster-tools_2.13</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-serialization-jackson_2.13</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-discovery_2.13</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.lightbend.akka.grpc</groupId>
-      <artifactId>akka-grpc-runtime_2.13</artifactId>
-      <version>${akka.grpc.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.typesafe.akka</groupId>
-      <artifactId>akka-testkit_2.13</artifactId>
-      <version>${akka.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>4.13.1</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.8.1</version>
-        <configuration>
-          <source>1.8</source>
-          <target>1.8</target>
-          <fork>true</fork>
-          <compilerArgs>
-            <arg>-Xlint</arg>
-            <arg>-parameters</arg>
-          </compilerArgs>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>com.lightbend.akka.grpc</groupId>
-        <artifactId>akka-grpc-maven-plugin</artifactId>
-        <version>${akka.grpc.version}</version>
-        <configuration>
-          <language>Java</language>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>generate</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-    </plugins>
-  </build>
-
-</project>
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java b/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
deleted file mode 100644
index 8ceda91..0000000
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package sample.cluster.client.grpc;
-
-import akka.actor.AbstractExtensionId;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.ExtendedActorSystem;
-import akka.actor.Extension;
-import akka.actor.ExtensionIdProvider;
-import akka.cluster.Cluster;
-import akka.cluster.pubsub.DistributedPubSub;
-import akka.cluster.pubsub.DistributedPubSubMediator;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.http.javadsl.ConnectHttp;
-import akka.http.javadsl.Http;
-import akka.stream.Materializer;
-import akka.stream.SystemMaterializer;
-
-import java.util.Optional;
-
-class ClusterClientReceptionistExtension extends AbstractExtensionId<ClusterClientReceptionist>
-  implements ExtensionIdProvider {
-  public static final ClusterClientReceptionistExtension INSTANCE = new ClusterClientReceptionistExtension();
-
-  private ClusterClientReceptionistExtension() {}
-
-  public ClusterClientReceptionistExtension lookup() {
-    return ClusterClientReceptionistExtension.INSTANCE;
-  }
-
-  public ClusterClientReceptionist createExtension(ExtendedActorSystem system) {
-    return new ClusterClientReceptionist(system);
-  }
-}
-
-/**
- * Extension that starts gRPC service and accompanying `akka.cluster.pubsub.DistributedPubSubMediator`
- * with settings defined in config section `sample.cluster.client.grpc.receptionist`.
- * The `akka.cluster.pubsub.DistributedPubSubMediator` is started by the `akka.cluster.pubsub.DistributedPubSub`
- * extension.
- */
-final class ClusterClientReceptionist implements Extension {
-
-  public static ClusterClientReceptionist get(ActorSystem system) {
-    return ClusterClientReceptionistExtension.INSTANCE.get(system);
-  }
-
-  private final ExtendedActorSystem system;
-  private final LoggingAdapter log;
-  public final ClusterReceptionistSettings settings;
-  private final Optional<String> role;
-
-  public ClusterClientReceptionist(ExtendedActorSystem system) {
-    this.system = system;
-    this.log = Logging.getLogger(system, getClass());
-    this.settings = ClusterReceptionistSettings.create(system);
-    this.role = settings.role;
-
-    log.info("Starting ClusterClientReceptionist gRPC server at {}", settings.hostPort);
-
-    ClusterClientSerialization serialization = new ClusterClientSerialization(system);
-
-    Materializer materializer = SystemMaterializer.get(system).materializer();
-
-    Http.get(system).bindAndHandleAsync(
-      ClusterClientReceptionistServiceHandlerFactory.create(
-        new ClusterClientReceptionistGrpcImpl(settings, pubSubMediator(), serialization, materializer, log),
-        system),
-      ConnectHttp.toHost(settings.hostPort.hostname, settings.hostPort.port),
-      materializer)
-      .whenComplete((result, exc) -> {
-        if (exc == null)
-          log.info("ClusterClientReceptionist gRPC server stopped successfully");
-        else
-          log.info("ClusterClientReceptionist gRPC server stopped after failure: {}", exc);
-      });
-  }
-
-  /**
-   * Returns true if this member is not tagged with the role configured for the
-   * receptionist.
-   */
-  public boolean isTerminated() {
-    return Cluster.get(system).isTerminated() ||
-      (role.isPresent() && !Cluster.get(system).getSelfRoles().contains(role.get()));
-  }
-
-  /**
-   * Register the actors that should be reachable for the clients in this `DistributedPubSubMediator`.
-   */
-  private ActorRef pubSubMediator() {
-    return DistributedPubSub.get(system).mediator();
-  }
-
-  /**
-   * Register an actor that should be reachable for the clients.
-   * The clients can send messages to this actor with `Send` or `SendToAll` using
-   * the path elements of the `ActorRef`, e.g. `"/user/myservice"`.
-   */
-  public void registerService(ActorRef actor) {
-    pubSubMediator().tell(new DistributedPubSubMediator.Put(actor), ActorRef.noSender());
-  }
-
-  /**
-   * A registered actor will be automatically unregistered when terminated,
-   * but it can also be explicitly unregistered before termination.
-   */
-  public void unregisterService(ActorRef actor) {
-    pubSubMediator().tell(new DistributedPubSubMediator.Remove(actor.path().toStringWithoutAddress()), ActorRef.noSender());
-  }
-
-  /**
-   * Register an actor that should be reachable for the clients to a named topic.
-   * Several actors can be registered to the same topic name, and all will receive
-   * published messages.
-   * The client can publish messages to this topic with `Publish`.
-   */
-  public void registerSubscriber(String topic, ActorRef actor) {
-    pubSubMediator().tell(new DistributedPubSubMediator.Subscribe(topic, actor), ActorRef.noSender());
-  }
-
-  /**
-   * A registered subscriber will be automatically unregistered when terminated,
-   * but it can also be explicitly unregistered before termination.
-   */
-  public void unregisterSubscriber(String topic, ActorRef actor) {
-    pubSubMediator().tell(new DistributedPubSubMediator.Unsubscribe(topic, actor), ActorRef.noSender());
-  }
-
-}
-
diff --git a/akka-sample-cluster-client-grpc-java/LICENSE b/pekko-sample-cluster-client-grpc-java/LICENSE
similarity index 100%
rename from akka-sample-cluster-client-grpc-java/LICENSE
rename to pekko-sample-cluster-client-grpc-java/LICENSE
diff --git a/akka-sample-cluster-client-grpc-java/README.md b/pekko-sample-cluster-client-grpc-java/README.md
similarity index 72%
rename from akka-sample-cluster-client-grpc-java/README.md
rename to pekko-sample-cluster-client-grpc-java/README.md
index 355456e..9e0ee7f 100644
--- a/akka-sample-cluster-client-grpc-java/README.md
+++ b/pekko-sample-cluster-client-grpc-java/README.md
@@ -1,7 +1,7 @@
 # Cluster Client with gRPC transport
 	
 See purpose of this example and important clarifications of when to use this approach in
-[Migration to Akka gRPC](https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-apache-pekko-grpc).
+[Migration to Apache Pekko gRPC](https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-apache-pekko-grpc).
 
 Project structure:
 
@@ -10,5 +10,5 @@ Project structure:
 * [ClusterClient](src/main/java/sample/cluster/client/grpc/ClusterClient.java) is the actor on the client
   side that messages are sent via
 * [ClusterClientReceptionist](src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java)
-  is an Akka extension on the cluster (server) side that implements the gRPC service and delegates
+  is an Apache Pekko extension on the cluster (server) side that implements the gRPC service and delegates
   messages to actors in the cluster that have been registered in Distributed PubSub. 
diff --git a/pekko-sample-cluster-client-grpc-java/pom.xml b/pekko-sample-cluster-client-grpc-java/pom.xml
new file mode 100644
index 0000000..cbdb0e4
--- /dev/null
+++ b/pekko-sample-cluster-client-grpc-java/pom.xml
@@ -0,0 +1,121 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.pekko</groupId>
+  <artifactId>pekko-sample-cluster-client-grpc-java</artifactId>
+  <packaging>jar</packaging>
+  <version>empty</version>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <pekko.version>0.0.0+26617-325e2156-SNAPSHOT</pekko.version>
+    <pekko.grpc.version>0.0.0-15-3d8bff9d-SNAPSHOT</pekko.grpc.version>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>apache-pekko-snapshots</id>
+      <name>Apache Snapshots Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <layout>default</layout>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+  <pluginRepositories>
+    <pluginRepository>
+      <id>apache-pekko-snapshots</id>
+      <name>Apache Snapshots Repository</name>
+      <url>https://repository.apache.org/content/repositories/snapshots/</url>
+      <layout>default</layout>
+      <snapshots>
+        <enabled>true</enabled>
+      </snapshots>
+    </pluginRepository>
+  </pluginRepositories>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pekko</groupId>
+      <artifactId>pekko-stream_2.13</artifactId>
+      <version>${pekko.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pekko</groupId>
+      <artifactId>pekko-cluster_2.13</artifactId>
+      <version>${pekko.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pekko</groupId>
+      <artifactId>pekko-cluster-tools_2.13</artifactId>
+      <version>${pekko.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pekko</groupId>
+      <artifactId>pekko-serialization-jackson_2.13</artifactId>
+      <version>${pekko.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pekko</groupId>
+      <artifactId>pekko-discovery_2.13</artifactId>
+      <version>${pekko.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pekko</groupId>
+      <artifactId>pekko-grpc-runtime_2.13</artifactId>
+      <version>${pekko.grpc.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pekko</groupId>
+      <artifactId>pekko-testkit_2.13</artifactId>
+      <version>${pekko.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.13.1</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.8.1</version>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+          <fork>true</fork>
+          <compilerArgs>
+            <arg>-Xlint</arg>
+            <arg>-parameters</arg>
+          </compilerArgs>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.pekko</groupId>
+        <artifactId>pekko-grpc-maven-plugin</artifactId>
+        <version>${pekko.grpc.version}</version>
+        <configuration>
+          <language>Java</language>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>generate</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+</project>
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java
similarity index 100%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
index fbbfa0c..c62be6f 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
@@ -1,19 +1,19 @@
 package sample.cluster.client.grpc;
 
-import akka.NotUsed;
-import akka.actor.AbstractLoggingActor;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.event.LoggingAdapter;
-import akka.japi.pf.ReceiveBuilder;
-import akka.pattern.Patterns;
-import akka.stream.KillSwitches;
-import akka.stream.Materializer;
-import akka.stream.OverflowStrategy;
-import akka.stream.SharedKillSwitch;
-import akka.stream.WatchedActorTerminatedException;
-import akka.stream.javadsl.Source;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.AbstractLoggingActor;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.actor.Props;
+import org.apache.pekko.actor.Terminated;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.japi.pf.ReceiveBuilder;
+import org.apache.pekko.pattern.Patterns;
+import org.apache.pekko.stream.KillSwitches;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.OverflowStrategy;
+import org.apache.pekko.stream.SharedKillSwitch;
+import org.apache.pekko.stream.WatchedActorTerminatedException;
+import org.apache.pekko.stream.javadsl.Source;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -24,7 +24,7 @@ import java.util.concurrent.CompletionStage;
 /**
  * This actor is intended to be used on an external node that is not member
  * of the cluster. It acts like a gateway for sending messages to actors
- * somewhere in the cluster. With service discovery and Akka gRPC it will establish
+ * somewhere in the cluster. With service discovery and Apache Pekko gRPC it will establish
  * a connection to a {@link ClusterClientReceptionist} somewhere in the cluster.
  * <p>
  * You can send messages via the `ClusterClient` to any actor in the cluster
@@ -48,7 +48,7 @@ import java.util.concurrent.CompletionStage;
  * to the named topic.
  * <p>
  * Use the factory method {@link ClusterClient#props ClusterClient.props}) to create the
- * `akka.actor.Props` for the actor.
+ * `org.apache.pekko.actor.Props` for the actor.
  * <p>
  * If the receptionist is not currently available, the client will buffer the messages
  * and then deliver them when the connection to the receptionist has been established.
@@ -62,7 +62,7 @@ import java.util.concurrent.CompletionStage;
 public class ClusterClient extends AbstractLoggingActor {
 
   /**
-   * Factory method for `ClusterClient` `akka.actor.Props`.
+   * Factory method for `ClusterClient` `org.apache.pekko.actor.Props`.
    */
   public static Props props(ClusterClientSettings settings, Materializer materializer) {
     return Props.create(ClusterClient.class, () -> new ClusterClient(settings, materializer));
@@ -133,7 +133,7 @@ public class ClusterClient extends AbstractLoggingActor {
 
   private static ClusterClientReceptionistServiceClient createClientStub(ClusterClientSettings settings,
       Materializer mat) {
-    return ClusterClientReceptionistServiceClient.create(settings.grpcClientSettings,mat, mat.executionContext());
+    return ClusterClientReceptionistServiceClient.create(settings.grpcClientSettings, mat.system());
   }
 
   private static CompletionStage<ActorRef> newSession(
diff --git a/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
new file mode 100644
index 0000000..bcece3e
--- /dev/null
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
@@ -0,0 +1,138 @@
+package sample.cluster.client.grpc;
+
+import org.apache.pekko.actor.AbstractExtensionId;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.actor.ExtendedActorSystem;
+import org.apache.pekko.actor.Extension;
+import org.apache.pekko.actor.ExtensionIdProvider;
+import org.apache.pekko.cluster.Cluster;
+import org.apache.pekko.cluster.pubsub.DistributedPubSub;
+import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
+import org.apache.pekko.event.Logging;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.http.javadsl.ConnectHttp;
+import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.http.javadsl.ServerBinding;
+import org.apache.pekko.http.javadsl.model.HttpRequest;
+import org.apache.pekko.http.javadsl.model.HttpResponse;
+import org.apache.pekko.japi.function.Function;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.SystemMaterializer;
+
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
+class ClusterClientReceptionistExtension extends AbstractExtensionId<ClusterClientReceptionist>
+        implements ExtensionIdProvider {
+    public static final ClusterClientReceptionistExtension INSTANCE = new ClusterClientReceptionistExtension();
+
+    private ClusterClientReceptionistExtension() {
+    }
+
+    public ClusterClientReceptionistExtension lookup() {
+        return ClusterClientReceptionistExtension.INSTANCE;
+    }
+
+    public ClusterClientReceptionist createExtension(ExtendedActorSystem system) {
+        return new ClusterClientReceptionist(system);
+    }
+}
+
+/**
+ * Extension that starts gRPC service and accompanying `org.apache.pekko.cluster.pubsub.DistributedPubSubMediator`
+ * with settings defined in config section `sample.cluster.client.grpc.receptionist`.
+ * The `org.apache.pekko.cluster.pubsub.DistributedPubSubMediator` is started by the `org.apache.pekko.cluster.pubsub.DistributedPubSub`
+ * extension.
+ */
+final class ClusterClientReceptionist implements Extension {
+
+    public static ClusterClientReceptionist get(ActorSystem system) {
+        return ClusterClientReceptionistExtension.INSTANCE.get(system);
+    }
+
+    private final ExtendedActorSystem system;
+    private final LoggingAdapter log;
+    public final ClusterReceptionistSettings settings;
+    private final Optional<String> role;
+
+    public ClusterClientReceptionist(ExtendedActorSystem system) {
+        this.system = system;
+        this.log = Logging.getLogger(system, getClass());
+        this.settings = ClusterReceptionistSettings.create(system);
+        this.role = settings.role;
+
+        log.info("Starting ClusterClientReceptionist gRPC server at {}", settings.hostPort);
+
+        ClusterClientSerialization serialization = new ClusterClientSerialization(system);
+
+        Materializer materializer = SystemMaterializer.get(system).materializer();
+
+        Function<HttpRequest, CompletionStage<HttpResponse>> service = ClusterClientReceptionistServiceHandlerFactory.create(
+                new ClusterClientReceptionistGrpcImpl(settings, pubSubMediator(), serialization, materializer, log),
+                system);
+        CompletionStage<ServerBinding> bound =
+                Http.get(system)
+                        .newServerAt(settings.hostPort.hostname, settings.hostPort.port)
+                        .bind(service)
+                        .whenComplete((result, exc) -> {
+                            if (exc == null)
+                                log.info("ClusterClientReceptionist gRPC server stopped successfully");
+                            else
+                                log.info("ClusterClientReceptionist gRPC server stopped after failure: {}", exc);
+                        });
+    }
+
+    /**
+     * Returns true if this member is not tagged with the role configured for the
+     * receptionist.
+     */
+    public boolean isTerminated() {
+        return Cluster.get(system).isTerminated() ||
+                (role.isPresent() && !Cluster.get(system).getSelfRoles().contains(role.get()));
+    }
+
+    /**
+     * Register the actors that should be reachable for the clients in this `DistributedPubSubMediator`.
+     */
+    private ActorRef pubSubMediator() {
+        return DistributedPubSub.get(system).mediator();
+    }
+
+    /**
+     * Register an actor that should be reachable for the clients.
+     * The clients can send messages to this actor with `Send` or `SendToAll` using
+     * the path elements of the `ActorRef`, e.g. `"/user/myservice"`.
+     */
+    public void registerService(ActorRef actor) {
+        pubSubMediator().tell(new DistributedPubSubMediator.Put(actor), ActorRef.noSender());
+    }
+
+    /**
+     * A registered actor will be automatically unregistered when terminated,
+     * but it can also be explicitly unregistered before termination.
+     */
+    public void unregisterService(ActorRef actor) {
+        pubSubMediator().tell(new DistributedPubSubMediator.Remove(actor.path().toStringWithoutAddress()), ActorRef.noSender());
+    }
+
+    /**
+     * Register an actor that should be reachable for the clients to a named topic.
+     * Several actors can be registered to the same topic name, and all will receive
+     * published messages.
+     * The client can publish messages to this topic with `Publish`.
+     */
+    public void registerSubscriber(String topic, ActorRef actor) {
+        pubSubMediator().tell(new DistributedPubSubMediator.Subscribe(topic, actor), ActorRef.noSender());
+    }
+
+    /**
+     * A registered subscriber will be automatically unregistered when terminated,
+     * but it can also be explicitly unregistered before termination.
+     */
+    public void unregisterSubscriber(String topic, ActorRef actor) {
+        pubSubMediator().tell(new DistributedPubSubMediator.Unsubscribe(topic, actor), ActorRef.noSender());
+    }
+
+}
+
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
index 07c010f..71bca70 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
@@ -1,13 +1,13 @@
 package sample.cluster.client.grpc;
 
-import akka.NotUsed;
-import akka.actor.ActorRef;
-import akka.cluster.pubsub.DistributedPubSubMediator;
-import akka.event.LoggingAdapter;
-import akka.pattern.Patterns;
-import akka.stream.Materializer;
-import akka.stream.OverflowStrategy;
-import akka.stream.javadsl.Source;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.pattern.Patterns;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.OverflowStrategy;
+import org.apache.pekko.stream.javadsl.Source;
 
 import java.util.Optional;
 import java.util.UUID;
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
similarity index 79%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
index 38c23bd..3781852 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
@@ -1,10 +1,10 @@
 package sample.cluster.client.grpc;
 
-import akka.actor.ActorSystem;
-import akka.serialization.Serialization;
-import akka.serialization.SerializationExtension;
-import akka.serialization.Serializer;
-import akka.serialization.Serializers;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.serialization.Serialization;
+import org.apache.pekko.serialization.SerializationExtension;
+import org.apache.pekko.serialization.Serializer;
+import org.apache.pekko.serialization.Serializers;
 import com.google.protobuf.ByteString;
 
 class ClusterClientSerialization {
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
index b010621..d497313 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
@@ -1,7 +1,7 @@
 package sample.cluster.client.grpc;
 
-import akka.actor.ActorSystem;
-import akka.grpc.GrpcClientSettings;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.grpc.GrpcClientSettings;
 import com.typesafe.config.Config;
 
 import java.time.Duration;
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
similarity index 97%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
index f34e8bb..93d2d32 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
@@ -1,6 +1,6 @@
 package sample.cluster.client.grpc;
 
-import akka.actor.ActorSystem;
+import org.apache.pekko.actor.ActorSystem;
 import com.typesafe.config.Config;
 
 import java.time.Duration;
diff --git a/akka-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto b/pekko-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto
similarity index 100%
rename from akka-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto
rename to pekko-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto
diff --git a/akka-sample-cluster-client-grpc-java/src/main/resources/application.conf b/pekko-sample-cluster-client-grpc-java/src/main/resources/application.conf
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/resources/application.conf
rename to pekko-sample-cluster-client-grpc-java/src/main/resources/application.conf
index 3cbbe70..f46d234 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/resources/application.conf
+++ b/pekko-sample-cluster-client-grpc-java/src/main/resources/application.conf
@@ -23,10 +23,10 @@ sample.cluster.client.grpc {
   buffer-size = 1000
 }
 
-akka.actor {
+pekko.actor {
   serialization-bindings {
     "sample.cluster.client.grpc.CborSerializable" = jackson-cbor
   }
 }
 
-akka.http.server.preview.enable-http2 = on
+pekko.http.server.preview.enable-http2 = on
diff --git a/akka-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java b/pekko-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
similarity index 93%
rename from akka-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
rename to pekko-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
index b0ef6a8..97bed5b 100644
--- a/akka-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
+++ b/pekko-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
@@ -1,17 +1,17 @@
 package sample.cluster.client.grpc;
 
 
-import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.cluster.Cluster;
-import akka.cluster.pubsub.DistributedPubSub;
-import akka.cluster.pubsub.DistributedPubSubMediator;
-import akka.japi.pf.ReceiveBuilder;
-import akka.pattern.Patterns;
-import akka.stream.Materializer;
-import akka.testkit.javadsl.TestKit;
+import org.apache.pekko.actor.AbstractActor;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.actor.Props;
+import org.apache.pekko.cluster.Cluster;
+import org.apache.pekko.cluster.pubsub.DistributedPubSub;
+import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
+import org.apache.pekko.japi.pf.ReceiveBuilder;
+import org.apache.pekko.pattern.Patterns;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.testkit.javadsl.TestKit;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.junit.AfterClass;
@@ -47,8 +47,8 @@ public class ClusterClientTest {
 
   private static Config clusterConfig(int grpcPort) {
    return  ConfigFactory.parseString(
-        "akka.actor.provider = cluster \n" +
-          "akka.remote.artery.canonical.port = 0 \n" +
+        "pekko.actor.provider = cluster \n" +
+          "pekko.remote.artery.canonical.port = 0 \n" +
           "sample.cluster.client.grpc.receptionist.canonical.port = " + grpcPort + " \n" +
           "").withFallback(ConfigFactory.load());
   }
@@ -57,7 +57,7 @@ public class ClusterClientTest {
   public static void setup() {
     Config clientConfig =
       ConfigFactory.parseString(
-        "akka.actor.provider = local \n" +
+        "pekko.actor.provider = local \n" +
           "").withFallback(ConfigFactory.load());
 
     clientNode = ActorSystem.create("ClusterClientTest");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org