You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by fa...@apache.org on 2023/03/17 19:50:24 UTC
[incubator-pekko-samples] branch main updated: Issue 243: Convert pekko-samples `cluster-client` java/scala to pekko (#4)
This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
The following commit(s) were added to refs/heads/main by this push:
new b285aa1 Issue 243: Convert pekko-samples `cluster-client` java/scala to pekko (#4)
b285aa1 is described below
commit b285aa12650740b90595a1db405c33b0a467c09b
Author: Sam Byng <43...@users.noreply.github.com>
AuthorDate: Fri Mar 17 19:50:18 2023 +0000
Issue 243: Convert pekko-samples `cluster-client` java/scala to pekko (#4)
* sambyng/gh-243: Change cluster-client-grpc dirs to have pekko prefix
* Convert to pekko-sample-cluster-client-gprc-java
update pom.xml:
Update versions to pekko version
Update jar dependencies with com.typesafe.akka to pekko groupID and artifactID. NO change to lightbend groupID items yet
GH-243: Update pom.xml based on assumption that all com.lightbend.akka.<package> group IDs publish to org.apache.pekko
I checked the lightbend.akka.grpc project and those packages are going to org.apache.pekko so assuming this is the case for all
Update pekko-sample-cluster-client-grpc-java source files to import akka
* Convert pekko-sample-cluster-client-grpc-scala
update build.sbt
Update plugins list for pekko-sample-cluster-client-grpc-scala
`- note sbt-pekko-grpc is the correct naming convention since looking at the grpc project build we have the plugin with name =
sbt-"$pekkoPrefix" [where pekkoPrefix=pekko-grpc]
Update application.conf to point at pekko
Update pekko-sample-cluster-client-grpc-scala source code files to point at pekko
* Update documentation: Update URLs to point to pekko.apache.org/docs/pekko/current ; Update READMEs to refer to Pekko ;
NOTE: No change to lightbend URLs since need to find equivalent docs
* In source code, Configs use pekko prefix instead of akka
* Update build-test.yml to point at new dirnames
* #243: Update build.sbt files to have a resolver to find the Pekko snapshots in the Apache repository
* Update docs Pekko and Cassandra references to have Apache prefix
* Add resolvers to pom.xml / build.sbt files for cluster-client
* Update Artery remoting ports 25251/25252 to 17356/17357
* Issue 243: cluster-client-grpc-java update pom.xml to use the grpc-maven-plugin, pulling from pluginRepository
* typo
* fix one of the 2 compile issues
* compilation fix
---------
Co-authored-by: sb5 <sb...@ad.datcon.co.uk>
Co-authored-by: PJ Fanning <pj...@users.noreply.github.com>
Co-authored-by: Seetaramayya <35...@users.noreply.github.com>
---
.github/workflows/build-test.yml | 4 +-
akka-sample-cluster-client-grpc-java/pom.xml | 99 ---------------
.../client/grpc/ClusterClientReceptionist.java | 131 -------------------
.../LICENSE | 0
.../README.md | 4 +-
pekko-sample-cluster-client-grpc-java/pom.xml | 121 ++++++++++++++++++
.../cluster/client/grpc/CborSerializable.java | 0
.../sample/cluster/client/grpc/ClusterClient.java | 36 +++---
.../client/grpc/ClusterClientReceptionist.java | 138 +++++++++++++++++++++
.../grpc/ClusterClientReceptionistGrpcImpl.java | 16 +--
.../client/grpc/ClusterClientSerialization.java | 10 +-
.../cluster/client/grpc/ClusterClientSettings.java | 4 +-
.../client/grpc/ClusterReceptionistSettings.java | 2 +-
.../src/main/protobuf/clusterclient.proto | 0
.../src/main/resources/application.conf | 4 +-
.../cluster/client/grpc/ClusterClientTest.java | 28 ++---
16 files changed, 313 insertions(+), 284 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 8e2e90b..e287eae 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -31,8 +31,8 @@ jobs:
- name: Test pekko-sample-cluster-client-grpc-scala
run: cd pekko-sample-cluster-client-grpc-scala && sbt multi-jvm:test
- - name: Test akka-sample-cluster-client-grpc-java
- run: cd akka-sample-cluster-client-grpc-java && mvn test
+ - name: Test pekko-sample-cluster-client-grpc-java
+ run: cd pekko-sample-cluster-client-grpc-java && mvn test
- name: Test pekko-sample-distributed-data-java
run: cd pekko-sample-distributed-data-java && sbt multi-jvm:test
diff --git a/akka-sample-cluster-client-grpc-java/pom.xml b/akka-sample-cluster-client-grpc-java/pom.xml
deleted file mode 100644
index 62579cd..0000000
--- a/akka-sample-cluster-client-grpc-java/pom.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <akka.version>2.6.20</akka.version>
- <akka.grpc.version>0.8.4</akka.grpc.version>
- </properties>
-
- <groupId>com.lightbend.akka.samples</groupId>
- <artifactId>akka-sample-cluster-client-grpc-java</artifactId>
- <packaging>jar</packaging>
- <version>empty</version>
-
- <dependencies>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-stream_2.13</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster_2.13</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-cluster-tools_2.13</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-serialization-jackson_2.13</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-discovery_2.13</artifactId>
- <version>${akka.version}</version>
- </dependency>
- <dependency>
- <groupId>com.lightbend.akka.grpc</groupId>
- <artifactId>akka-grpc-runtime_2.13</artifactId>
- <version>${akka.grpc.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-testkit_2.13</artifactId>
- <version>${akka.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.13.1</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <fork>true</fork>
- <compilerArgs>
- <arg>-Xlint</arg>
- <arg>-parameters</arg>
- </compilerArgs>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>com.lightbend.akka.grpc</groupId>
- <artifactId>akka-grpc-maven-plugin</artifactId>
- <version>${akka.grpc.version}</version>
- <configuration>
- <language>Java</language>
- </configuration>
- <executions>
- <execution>
- <goals>
- <goal>generate</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>
-
-</project>
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java b/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
deleted file mode 100644
index 8ceda91..0000000
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package sample.cluster.client.grpc;
-
-import akka.actor.AbstractExtensionId;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.ExtendedActorSystem;
-import akka.actor.Extension;
-import akka.actor.ExtensionIdProvider;
-import akka.cluster.Cluster;
-import akka.cluster.pubsub.DistributedPubSub;
-import akka.cluster.pubsub.DistributedPubSubMediator;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.http.javadsl.ConnectHttp;
-import akka.http.javadsl.Http;
-import akka.stream.Materializer;
-import akka.stream.SystemMaterializer;
-
-import java.util.Optional;
-
-class ClusterClientReceptionistExtension extends AbstractExtensionId<ClusterClientReceptionist>
- implements ExtensionIdProvider {
- public static final ClusterClientReceptionistExtension INSTANCE = new ClusterClientReceptionistExtension();
-
- private ClusterClientReceptionistExtension() {}
-
- public ClusterClientReceptionistExtension lookup() {
- return ClusterClientReceptionistExtension.INSTANCE;
- }
-
- public ClusterClientReceptionist createExtension(ExtendedActorSystem system) {
- return new ClusterClientReceptionist(system);
- }
-}
-
-/**
- * Extension that starts gRPC service and accompanying `akka.cluster.pubsub.DistributedPubSubMediator`
- * with settings defined in config section `sample.cluster.client.grpc.receptionist`.
- * The `akka.cluster.pubsub.DistributedPubSubMediator` is started by the `akka.cluster.pubsub.DistributedPubSub`
- * extension.
- */
-final class ClusterClientReceptionist implements Extension {
-
- public static ClusterClientReceptionist get(ActorSystem system) {
- return ClusterClientReceptionistExtension.INSTANCE.get(system);
- }
-
- private final ExtendedActorSystem system;
- private final LoggingAdapter log;
- public final ClusterReceptionistSettings settings;
- private final Optional<String> role;
-
- public ClusterClientReceptionist(ExtendedActorSystem system) {
- this.system = system;
- this.log = Logging.getLogger(system, getClass());
- this.settings = ClusterReceptionistSettings.create(system);
- this.role = settings.role;
-
- log.info("Starting ClusterClientReceptionist gRPC server at {}", settings.hostPort);
-
- ClusterClientSerialization serialization = new ClusterClientSerialization(system);
-
- Materializer materializer = SystemMaterializer.get(system).materializer();
-
- Http.get(system).bindAndHandleAsync(
- ClusterClientReceptionistServiceHandlerFactory.create(
- new ClusterClientReceptionistGrpcImpl(settings, pubSubMediator(), serialization, materializer, log),
- system),
- ConnectHttp.toHost(settings.hostPort.hostname, settings.hostPort.port),
- materializer)
- .whenComplete((result, exc) -> {
- if (exc == null)
- log.info("ClusterClientReceptionist gRPC server stopped successfully");
- else
- log.info("ClusterClientReceptionist gRPC server stopped after failure: {}", exc);
- });
- }
-
- /**
- * Returns true if this member is not tagged with the role configured for the
- * receptionist.
- */
- public boolean isTerminated() {
- return Cluster.get(system).isTerminated() ||
- (role.isPresent() && !Cluster.get(system).getSelfRoles().contains(role.get()));
- }
-
- /**
- * Register the actors that should be reachable for the clients in this `DistributedPubSubMediator`.
- */
- private ActorRef pubSubMediator() {
- return DistributedPubSub.get(system).mediator();
- }
-
- /**
- * Register an actor that should be reachable for the clients.
- * The clients can send messages to this actor with `Send` or `SendToAll` using
- * the path elements of the `ActorRef`, e.g. `"/user/myservice"`.
- */
- public void registerService(ActorRef actor) {
- pubSubMediator().tell(new DistributedPubSubMediator.Put(actor), ActorRef.noSender());
- }
-
- /**
- * A registered actor will be automatically unregistered when terminated,
- * but it can also be explicitly unregistered before termination.
- */
- public void unregisterService(ActorRef actor) {
- pubSubMediator().tell(new DistributedPubSubMediator.Remove(actor.path().toStringWithoutAddress()), ActorRef.noSender());
- }
-
- /**
- * Register an actor that should be reachable for the clients to a named topic.
- * Several actors can be registered to the same topic name, and all will receive
- * published messages.
- * The client can publish messages to this topic with `Publish`.
- */
- public void registerSubscriber(String topic, ActorRef actor) {
- pubSubMediator().tell(new DistributedPubSubMediator.Subscribe(topic, actor), ActorRef.noSender());
- }
-
- /**
- * A registered subscriber will be automatically unregistered when terminated,
- * but it can also be explicitly unregistered before termination.
- */
- public void unregisterSubscriber(String topic, ActorRef actor) {
- pubSubMediator().tell(new DistributedPubSubMediator.Unsubscribe(topic, actor), ActorRef.noSender());
- }
-
-}
-
diff --git a/akka-sample-cluster-client-grpc-java/LICENSE b/pekko-sample-cluster-client-grpc-java/LICENSE
similarity index 100%
rename from akka-sample-cluster-client-grpc-java/LICENSE
rename to pekko-sample-cluster-client-grpc-java/LICENSE
diff --git a/akka-sample-cluster-client-grpc-java/README.md b/pekko-sample-cluster-client-grpc-java/README.md
similarity index 72%
rename from akka-sample-cluster-client-grpc-java/README.md
rename to pekko-sample-cluster-client-grpc-java/README.md
index 355456e..9e0ee7f 100644
--- a/akka-sample-cluster-client-grpc-java/README.md
+++ b/pekko-sample-cluster-client-grpc-java/README.md
@@ -1,7 +1,7 @@
# Cluster Client with gRPC transport
See purpose of this example and important clarifications of when to use this approach in
-[Migration to Akka gRPC](https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-apache-pekko-grpc).
+[Migration to Apache Pekko gRPC](https://pekko.apache.org/docs/pekko/current/cluster-client.html#migration-to-apache-pekko-grpc).
Project structure:
@@ -10,5 +10,5 @@ Project structure:
* [ClusterClient](src/main/java/sample/cluster/client/grpc/ClusterClient.java) is the actor on the client
side that messages are sent via
* [ClusterClientReceptionist](src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java)
- is an Akka extension on the cluster (server) side that implements the gRPC service and delegates
+ is an Apache Pekko extension on the cluster (server) side that implements the gRPC service and delegates
messages to actors in the cluster that have been registered in Distributed PubSub.
diff --git a/pekko-sample-cluster-client-grpc-java/pom.xml b/pekko-sample-cluster-client-grpc-java/pom.xml
new file mode 100644
index 0000000..cbdb0e4
--- /dev/null
+++ b/pekko-sample-cluster-client-grpc-java/pom.xml
@@ -0,0 +1,121 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-sample-cluster-client-grpc-java</artifactId>
+ <packaging>jar</packaging>
+ <version>empty</version>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <pekko.version>0.0.0+26617-325e2156-SNAPSHOT</pekko.version>
+ <pekko.grpc.version>0.0.0-15-3d8bff9d-SNAPSHOT</pekko.grpc.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache-pekko-snapshots</id>
+ <name>Apache Snapshots Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <layout>default</layout>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+ <pluginRepositories>
+ <pluginRepository>
+ <id>apache-pekko-snapshots</id>
+ <name>Apache Snapshots Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <layout>default</layout>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </pluginRepository>
+ </pluginRepositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-stream_2.13</artifactId>
+ <version>${pekko.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-cluster_2.13</artifactId>
+ <version>${pekko.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-cluster-tools_2.13</artifactId>
+ <version>${pekko.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-serialization-jackson_2.13</artifactId>
+ <version>${pekko.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-discovery_2.13</artifactId>
+ <version>${pekko.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-grpc-runtime_2.13</artifactId>
+ <version>${pekko.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-testkit_2.13</artifactId>
+ <version>${pekko.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <fork>true</fork>
+ <compilerArgs>
+ <arg>-Xlint</arg>
+ <arg>-parameters</arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.pekko</groupId>
+ <artifactId>pekko-grpc-maven-plugin</artifactId>
+ <version>${pekko.grpc.version}</version>
+ <configuration>
+ <language>Java</language>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java
similarity index 100%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/CborSerializable.java
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
index fbbfa0c..c62be6f 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClient.java
@@ -1,19 +1,19 @@
package sample.cluster.client.grpc;
-import akka.NotUsed;
-import akka.actor.AbstractLoggingActor;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.event.LoggingAdapter;
-import akka.japi.pf.ReceiveBuilder;
-import akka.pattern.Patterns;
-import akka.stream.KillSwitches;
-import akka.stream.Materializer;
-import akka.stream.OverflowStrategy;
-import akka.stream.SharedKillSwitch;
-import akka.stream.WatchedActorTerminatedException;
-import akka.stream.javadsl.Source;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.AbstractLoggingActor;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.actor.Props;
+import org.apache.pekko.actor.Terminated;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.japi.pf.ReceiveBuilder;
+import org.apache.pekko.pattern.Patterns;
+import org.apache.pekko.stream.KillSwitches;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.OverflowStrategy;
+import org.apache.pekko.stream.SharedKillSwitch;
+import org.apache.pekko.stream.WatchedActorTerminatedException;
+import org.apache.pekko.stream.javadsl.Source;
import java.util.HashMap;
import java.util.Map;
@@ -24,7 +24,7 @@ import java.util.concurrent.CompletionStage;
/**
* This actor is intended to be used on an external node that is not member
* of the cluster. It acts like a gateway for sending messages to actors
- * somewhere in the cluster. With service discovery and Akka gRPC it will establish
+ * somewhere in the cluster. With service discovery and Apache Pekko gRPC it will establish
* a connection to a {@link ClusterClientReceptionist} somewhere in the cluster.
* <p>
* You can send messages via the `ClusterClient` to any actor in the cluster
@@ -48,7 +48,7 @@ import java.util.concurrent.CompletionStage;
* to the named topic.
* <p>
* Use the factory method {@link ClusterClient#props ClusterClient.props}) to create the
- * `akka.actor.Props` for the actor.
+ * `org.apache.pekko.actor.Props` for the actor.
* <p>
* If the receptionist is not currently available, the client will buffer the messages
* and then deliver them when the connection to the receptionist has been established.
@@ -62,7 +62,7 @@ import java.util.concurrent.CompletionStage;
public class ClusterClient extends AbstractLoggingActor {
/**
- * Factory method for `ClusterClient` `akka.actor.Props`.
+ * Factory method for `ClusterClient` `org.apache.pekko.actor.Props`.
*/
public static Props props(ClusterClientSettings settings, Materializer materializer) {
return Props.create(ClusterClient.class, () -> new ClusterClient(settings, materializer));
@@ -133,7 +133,7 @@ public class ClusterClient extends AbstractLoggingActor {
private static ClusterClientReceptionistServiceClient createClientStub(ClusterClientSettings settings,
Materializer mat) {
- return ClusterClientReceptionistServiceClient.create(settings.grpcClientSettings,mat, mat.executionContext());
+ return ClusterClientReceptionistServiceClient.create(settings.grpcClientSettings, mat.system());
}
private static CompletionStage<ActorRef> newSession(
diff --git a/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
new file mode 100644
index 0000000..bcece3e
--- /dev/null
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionist.java
@@ -0,0 +1,138 @@
+package sample.cluster.client.grpc;
+
+import org.apache.pekko.actor.AbstractExtensionId;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.actor.ExtendedActorSystem;
+import org.apache.pekko.actor.Extension;
+import org.apache.pekko.actor.ExtensionIdProvider;
+import org.apache.pekko.cluster.Cluster;
+import org.apache.pekko.cluster.pubsub.DistributedPubSub;
+import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
+import org.apache.pekko.event.Logging;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.http.javadsl.ConnectHttp;
+import org.apache.pekko.http.javadsl.Http;
+import org.apache.pekko.http.javadsl.ServerBinding;
+import org.apache.pekko.http.javadsl.model.HttpRequest;
+import org.apache.pekko.http.javadsl.model.HttpResponse;
+import org.apache.pekko.japi.function.Function;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.SystemMaterializer;
+
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
+class ClusterClientReceptionistExtension extends AbstractExtensionId<ClusterClientReceptionist>
+ implements ExtensionIdProvider {
+ public static final ClusterClientReceptionistExtension INSTANCE = new ClusterClientReceptionistExtension();
+
+ private ClusterClientReceptionistExtension() {
+ }
+
+ public ClusterClientReceptionistExtension lookup() {
+ return ClusterClientReceptionistExtension.INSTANCE;
+ }
+
+ public ClusterClientReceptionist createExtension(ExtendedActorSystem system) {
+ return new ClusterClientReceptionist(system);
+ }
+}
+
+/**
+ * Extension that starts gRPC service and accompanying `org.apache.pekko.cluster.pubsub.DistributedPubSubMediator`
+ * with settings defined in config section `sample.cluster.client.grpc.receptionist`.
+ * The `org.apache.pekko.cluster.pubsub.DistributedPubSubMediator` is started by the `org.apache.pekko.cluster.pubsub.DistributedPubSub`
+ * extension.
+ */
+final class ClusterClientReceptionist implements Extension {
+
+ public static ClusterClientReceptionist get(ActorSystem system) {
+ return ClusterClientReceptionistExtension.INSTANCE.get(system);
+ }
+
+ private final ExtendedActorSystem system;
+ private final LoggingAdapter log;
+ public final ClusterReceptionistSettings settings;
+ private final Optional<String> role;
+
+ public ClusterClientReceptionist(ExtendedActorSystem system) {
+ this.system = system;
+ this.log = Logging.getLogger(system, getClass());
+ this.settings = ClusterReceptionistSettings.create(system);
+ this.role = settings.role;
+
+ log.info("Starting ClusterClientReceptionist gRPC server at {}", settings.hostPort);
+
+ ClusterClientSerialization serialization = new ClusterClientSerialization(system);
+
+ Materializer materializer = SystemMaterializer.get(system).materializer();
+
+ Function<HttpRequest, CompletionStage<HttpResponse>> service = ClusterClientReceptionistServiceHandlerFactory.create(
+ new ClusterClientReceptionistGrpcImpl(settings, pubSubMediator(), serialization, materializer, log),
+ system);
+ CompletionStage<ServerBinding> bound =
+ Http.get(system)
+ .newServerAt(settings.hostPort.hostname, settings.hostPort.port)
+ .bind(service)
+ .whenComplete((result, exc) -> {
+ if (exc == null)
+ log.info("ClusterClientReceptionist gRPC server stopped successfully");
+ else
+ log.info("ClusterClientReceptionist gRPC server stopped after failure: {}", exc);
+ });
+ }
+
+ /**
+ * Returns true if this member is not tagged with the role configured for the
+ * receptionist.
+ */
+ public boolean isTerminated() {
+ return Cluster.get(system).isTerminated() ||
+ (role.isPresent() && !Cluster.get(system).getSelfRoles().contains(role.get()));
+ }
+
+ /**
+ * Register the actors that should be reachable for the clients in this `DistributedPubSubMediator`.
+ */
+ private ActorRef pubSubMediator() {
+ return DistributedPubSub.get(system).mediator();
+ }
+
+ /**
+ * Register an actor that should be reachable for the clients.
+ * The clients can send messages to this actor with `Send` or `SendToAll` using
+ * the path elements of the `ActorRef`, e.g. `"/user/myservice"`.
+ */
+ public void registerService(ActorRef actor) {
+ pubSubMediator().tell(new DistributedPubSubMediator.Put(actor), ActorRef.noSender());
+ }
+
+ /**
+ * A registered actor will be automatically unregistered when terminated,
+ * but it can also be explicitly unregistered before termination.
+ */
+ public void unregisterService(ActorRef actor) {
+ pubSubMediator().tell(new DistributedPubSubMediator.Remove(actor.path().toStringWithoutAddress()), ActorRef.noSender());
+ }
+
+ /**
+ * Register an actor that should be reachable for the clients to a named topic.
+ * Several actors can be registered to the same topic name, and all will receive
+ * published messages.
+ * The client can publish messages to this topic with `Publish`.
+ */
+ public void registerSubscriber(String topic, ActorRef actor) {
+ pubSubMediator().tell(new DistributedPubSubMediator.Subscribe(topic, actor), ActorRef.noSender());
+ }
+
+ /**
+ * A registered subscriber will be automatically unregistered when terminated,
+ * but it can also be explicitly unregistered before termination.
+ */
+ public void unregisterSubscriber(String topic, ActorRef actor) {
+ pubSubMediator().tell(new DistributedPubSubMediator.Unsubscribe(topic, actor), ActorRef.noSender());
+ }
+
+}
+
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
index 07c010f..71bca70 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientReceptionistGrpcImpl.java
@@ -1,13 +1,13 @@
package sample.cluster.client.grpc;
-import akka.NotUsed;
-import akka.actor.ActorRef;
-import akka.cluster.pubsub.DistributedPubSubMediator;
-import akka.event.LoggingAdapter;
-import akka.pattern.Patterns;
-import akka.stream.Materializer;
-import akka.stream.OverflowStrategy;
-import akka.stream.javadsl.Source;
+import org.apache.pekko.NotUsed;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
+import org.apache.pekko.event.LoggingAdapter;
+import org.apache.pekko.pattern.Patterns;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.stream.OverflowStrategy;
+import org.apache.pekko.stream.javadsl.Source;
import java.util.Optional;
import java.util.UUID;
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
similarity index 79%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
index 38c23bd..3781852 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSerialization.java
@@ -1,10 +1,10 @@
package sample.cluster.client.grpc;
-import akka.actor.ActorSystem;
-import akka.serialization.Serialization;
-import akka.serialization.SerializationExtension;
-import akka.serialization.Serializer;
-import akka.serialization.Serializers;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.serialization.Serialization;
+import org.apache.pekko.serialization.SerializationExtension;
+import org.apache.pekko.serialization.Serializer;
+import org.apache.pekko.serialization.Serializers;
import com.google.protobuf.ByteString;
class ClusterClientSerialization {
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
index b010621..d497313 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterClientSettings.java
@@ -1,7 +1,7 @@
package sample.cluster.client.grpc;
-import akka.actor.ActorSystem;
-import akka.grpc.GrpcClientSettings;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.grpc.GrpcClientSettings;
import com.typesafe.config.Config;
import java.time.Duration;
diff --git a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
similarity index 97%
rename from akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
rename to pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
index f34e8bb..93d2d32 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
+++ b/pekko-sample-cluster-client-grpc-java/src/main/java/sample/cluster/client/grpc/ClusterReceptionistSettings.java
@@ -1,6 +1,6 @@
package sample.cluster.client.grpc;
-import akka.actor.ActorSystem;
+import org.apache.pekko.actor.ActorSystem;
import com.typesafe.config.Config;
import java.time.Duration;
diff --git a/akka-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto b/pekko-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto
similarity index 100%
rename from akka-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto
rename to pekko-sample-cluster-client-grpc-java/src/main/protobuf/clusterclient.proto
diff --git a/akka-sample-cluster-client-grpc-java/src/main/resources/application.conf b/pekko-sample-cluster-client-grpc-java/src/main/resources/application.conf
similarity index 91%
rename from akka-sample-cluster-client-grpc-java/src/main/resources/application.conf
rename to pekko-sample-cluster-client-grpc-java/src/main/resources/application.conf
index 3cbbe70..f46d234 100644
--- a/akka-sample-cluster-client-grpc-java/src/main/resources/application.conf
+++ b/pekko-sample-cluster-client-grpc-java/src/main/resources/application.conf
@@ -23,10 +23,10 @@ sample.cluster.client.grpc {
buffer-size = 1000
}
-akka.actor {
+pekko.actor {
serialization-bindings {
"sample.cluster.client.grpc.CborSerializable" = jackson-cbor
}
}
-akka.http.server.preview.enable-http2 = on
+pekko.http.server.preview.enable-http2 = on
diff --git a/akka-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java b/pekko-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
similarity index 93%
rename from akka-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
rename to pekko-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
index b0ef6a8..97bed5b 100644
--- a/akka-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
+++ b/pekko-sample-cluster-client-grpc-java/src/test/java/sample/cluster/client/grpc/ClusterClientTest.java
@@ -1,17 +1,17 @@
package sample.cluster.client.grpc;
-import akka.actor.AbstractActor;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.cluster.Cluster;
-import akka.cluster.pubsub.DistributedPubSub;
-import akka.cluster.pubsub.DistributedPubSubMediator;
-import akka.japi.pf.ReceiveBuilder;
-import akka.pattern.Patterns;
-import akka.stream.Materializer;
-import akka.testkit.javadsl.TestKit;
+import org.apache.pekko.actor.AbstractActor;
+import org.apache.pekko.actor.ActorRef;
+import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.actor.Props;
+import org.apache.pekko.cluster.Cluster;
+import org.apache.pekko.cluster.pubsub.DistributedPubSub;
+import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
+import org.apache.pekko.japi.pf.ReceiveBuilder;
+import org.apache.pekko.pattern.Patterns;
+import org.apache.pekko.stream.Materializer;
+import org.apache.pekko.testkit.javadsl.TestKit;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.AfterClass;
@@ -47,8 +47,8 @@ public class ClusterClientTest {
private static Config clusterConfig(int grpcPort) {
return ConfigFactory.parseString(
- "akka.actor.provider = cluster \n" +
- "akka.remote.artery.canonical.port = 0 \n" +
+ "pekko.actor.provider = cluster \n" +
+ "pekko.remote.artery.canonical.port = 0 \n" +
"sample.cluster.client.grpc.receptionist.canonical.port = " + grpcPort + " \n" +
"").withFallback(ConfigFactory.load());
}
@@ -57,7 +57,7 @@ public class ClusterClientTest {
public static void setup() {
Config clientConfig =
ConfigFactory.parseString(
- "akka.actor.provider = local \n" +
+ "pekko.actor.provider = local \n" +
"").withFallback(ConfigFactory.load());
clientNode = ActorSystem.create("ClusterClientTest");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org