You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/05/04 04:00:01 UTC

[4/4] curator git commit: finished docs for the pub-sub example

finished docs for the pub-sub example


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5ac1a331
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5ac1a331
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5ac1a331

Branch: refs/heads/CURATOR-397
Commit: 5ac1a3314e372d8bd22ef7bba704b5621a1638b8
Parents: c97fae4
Author: randgalt <ra...@apache.org>
Authored: Wed May 3 22:59:47 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed May 3 22:59:47 2017 -0500

----------------------------------------------------------------------
 curator-examples/src/main/java/pubsub/README.md | 16 ++++++++++++++
 .../src/main/java/pubsub/SubPubTest.java        | 23 +++++++++++++-------
 2 files changed, 31 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5ac1a331/curator-examples/src/main/java/pubsub/README.md
----------------------------------------------------------------------
diff --git a/curator-examples/src/main/java/pubsub/README.md b/curator-examples/src/main/java/pubsub/README.md
index 51a7048..adea2ef 100644
--- a/curator-examples/src/main/java/pubsub/README.md
+++ b/curator-examples/src/main/java/pubsub/README.md
@@ -56,3 +56,19 @@ In this example, the TypedModelSpecs are defined in `Clients.java`. E.g.
 public static final TypedModeledFramework<Instance, InstanceType> instanceClient = 
    TypedModeledFramework.from(ModeledFramework.builder(), ModelSpecs.instanceModelSpec)
 ```
+
+## Publisher
+
+`Publisher.java` shows how to use the ModeledFramework to write models. There are methods to write single instances and to write lists of instances in a transaction. Each publish method resolves the appropriate typed client and then calls its `set()` method with the given model.
+
+## Subscriber
+
+`Subscriber.java` uses CachedModeledFrameworks to listen for changes on the parent nodes for all of the models in this example. Each of the methods resolves the appropriate typed client and then starts the cache (via `cached()`).
+
+## SubPubTest
+
+`SubPubTest.java` is a class that exercises this example. 
+
+* `start()` uses `Subscriber` to start a `CachedModeledFramework` for each combination of the Instance + InstanceType, LocationAvailable + Group + Priority, and UserCreated + Group + Priority. It then adds a simple listener to each cache that merely prints the class name and path whenever an update occurs (see `generalListener()`).
+* `start()` also starts a scheduled task that runs every second. This task calls `publishSomething()`
+* `publishSomething()` randomly publishes either a single Instance, LocationAvailable, UserCreated or a list of those.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/curator/blob/5ac1a331/curator-examples/src/main/java/pubsub/SubPubTest.java
----------------------------------------------------------------------
diff --git a/curator-examples/src/main/java/pubsub/SubPubTest.java b/curator-examples/src/main/java/pubsub/SubPubTest.java
index 90b7699..b5c1629 100644
--- a/curator-examples/src/main/java/pubsub/SubPubTest.java
+++ b/curator-examples/src/main/java/pubsub/SubPubTest.java
@@ -50,14 +50,14 @@ public class SubPubTest implements Closeable
 {
     private final TestingServer testingServer;
     private final AsyncCuratorFramework client;
-    private final Publisher publisher;
     private final ScheduledExecutorService executorService;
     private final List<CachedModeledFramework<Instance>> instanceSubscribers = new ArrayList<>();
     private final List<CachedModeledFramework<LocationAvailable>> locationAvailableSubscribers = new ArrayList<>();
     private final List<CachedModeledFramework<UserCreated>> userCreatedSubscribers = new ArrayList<>();
 
-    private static final AtomicLong id = new AtomicLong(1);
+    private static final AtomicLong nextId = new AtomicLong(1);
 
+    // arrays of random values used for this example
     private static final Group[] groups = {new Group("main"), new Group("admin")};
     private static final String[] hostnames = {"host1", "host2", "host3"};
     private static final Integer[] ports = {80, 443, 9999};
@@ -70,7 +70,7 @@ public class SubPubTest implements Closeable
         try ( SubPubTest subPubTest = new SubPubTest() )
         {
             subPubTest.start();
-            TimeUnit.MINUTES.sleep(1);
+            TimeUnit.MINUTES.sleep(1);  // run the test for a minute then exit
         }
     }
 
@@ -78,7 +78,6 @@ public class SubPubTest implements Closeable
     {
         this.testingServer = new TestingServer();
         client = AsyncCuratorFramework.wrap(CuratorFrameworkFactory.newClient(testingServer.getConnectString(), new RetryOneTime(1)));
-        publisher = new Publisher(client);
         executorService = Executors.newSingleThreadScheduledExecutor();
     }
 
@@ -86,30 +85,37 @@ public class SubPubTest implements Closeable
     {
         client.unwrap().start();
 
+        Publisher publisher = new Publisher(client);
         Subscriber subscriber = new Subscriber(client);
+
+        // start a subscriber/cache for Instances of each InstanceType
         instanceSubscribers.addAll(
             Arrays.stream(InstanceType.values())
             .map(subscriber::startInstanceSubscriber)
             .collect(Collectors.toList())
         );
 
+        // start a subscriber/cache for LocationAvailables of each combination of Group and Priority
         locationAvailableSubscribers.addAll(
             Arrays.stream(Priority.values())
                 .flatMap(priority -> Arrays.stream(groups).map(group -> subscriber.startLocationAvailableSubscriber(group, priority)))
                 .collect(Collectors.toList())
         );
 
+        // start a subscriber/cache for UserCreateds of each combination of Group and Priority
         userCreatedSubscribers.addAll(
             Arrays.stream(Priority.values())
                 .flatMap(priority -> Arrays.stream(groups).map(group -> subscriber.startUserCreatedSubscriber(group, priority)))
                 .collect(Collectors.toList())
         );
 
+        // add listeners for each of the caches
         instanceSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener()));
         locationAvailableSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener()));
         userCreatedSubscribers.forEach(s -> s.getCache().listenable().addListener(generalListener()));
 
-        executorService.scheduleAtFixedRate(this::publishSomething, 1, 1, TimeUnit.SECONDS);
+        // schedule the publisher task once a second
+        executorService.scheduleAtFixedRate(() -> publishSomething(publisher), 1, 1, TimeUnit.SECONDS);
     }
 
     @Override
@@ -132,9 +138,10 @@ public class SubPubTest implements Closeable
         testingServer.close();
     }
 
-    private void publishSomething()
+    private void publishSomething(Publisher publisher)
     {
-        switch ( ThreadLocalRandom.current().nextInt(5) )
+        // randomly do some publishing - either single items or lists of items in a transaction
+        switch ( ThreadLocalRandom.current().nextInt(6) )
         {
             case 0:
             {
@@ -206,6 +213,6 @@ public class SubPubTest implements Closeable
 
     private String nextId()
     {
-        return Long.toString(id.getAndIncrement());
+        return Long.toString(nextId.getAndIncrement());
     }
 }