You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2019/01/14 18:09:28 UTC

[geode-examples] branch develop updated: GEODE-5806: Adding example for durable messaging in client subscriptions (#69)

This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-examples.git


The following commit(s) were added to refs/heads/develop by this push:
     new a2b7afa  GEODE-5806: Adding example for durable messaging in client subscriptions (#69)
a2b7afa is described below

commit a2b7afada61030e3d05f76be90687e673c0c8e03
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Mon Jan 14 10:09:24 2019 -0800

    GEODE-5806: Adding example for durable messaging in client subscriptions (#69)
---
 README.md                                          |   4 +-
 durableMessaging/README.md                         |  51 ++++++++++
 durableMessaging/scripts/start.gfsh                |  24 +++++
 durableMessaging/scripts/stop.gfsh                 |  18 ++++
 .../geode_examples/durableMessaging/Example.java   | 113 +++++++++++++++++++++
 gradle/rat.gradle                                  |   1 +
 settings.gradle                                    |   1 +
 7 files changed, 210 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index 804c3d1..9cbee7a 100644
--- a/README.md
+++ b/README.md
@@ -86,8 +86,8 @@ tutorial.
 ### Advanced
 
 *  [Lucene Spatial Indexing](luceneSpatial/README.md)
-*  WAN Gateway
-*  Durable subscriptions
+*  [WAN Gateway](wan/README.md)
+*  [Durable Messaging for Subscriptions](durableMessaging/README.md)
 *  Delta propagation
 *  Network partition detection
 *  D-lock
diff --git a/durableMessaging/README.md b/durableMessaging/README.md
new file mode 100644
index 0000000..f80a13d
--- /dev/null
+++ b/durableMessaging/README.md
@@ -0,0 +1,51 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Geode Durable Messaging Example
+
+This example demonstrates Apache Geode's Durable Messaging feature.
+Use durable messaging for subscriptions that you need maintained for your clients even when your clients are down or disconnected.
+You can configure any of your event subscriptions as durable. Events for durable queries and subscriptions are saved in a queue when the client
+is disconnected and played back when the client reconnects. Other queries and subscriptions are removed from the queue.
+
+The example performs the following tasks to demonstrate durable messaging:
+
+1. Create a client cache with durable messaging enabled
+2. Register interest in all keys in the example region with durable messaging enabled
+3. Close the client cache, simulating a disconnection
+4. Start a second client, and do puts while the first client is down
+5. Restart the first client, and observe that the create events in the durable queue are delivered.  A simple cache listener is used to print output to the terminal as create events are received.  If interested, see [Cache Listeners](listener/README.md) for more details on how cache listeners work.
+
+This example assumes you have installed Java and Geode.
+
+## Steps
+
+1. From the `geode-examples/durableMessaging` directory, build the example.
+
+        $ ../gradlew build
+
+2. Next start a locator, start a server, and create a region.
+
+        $ gfsh run --file=scripts/start.gfsh
+
+3. Run the example to demonstrate durable messaging.
+
+        $ ../gradlew run
+
+4. Shut down the server.
+
+        $ gfsh run --file=scripts/stop.gfsh
diff --git a/durableMessaging/scripts/start.gfsh b/durableMessaging/scripts/start.gfsh
new file mode 100644
index 0000000..4c845bc
--- /dev/null
+++ b/durableMessaging/scripts/start.gfsh
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+start locator --name=locator --bind-address=127.0.0.1
+
+start server --name=server --locators=127.0.0.1[10334] --server-port=0
+
+list members
+
+create region --name=example-region --type=REPLICATE
+describe region --name=example-region
diff --git a/durableMessaging/scripts/stop.gfsh b/durableMessaging/scripts/stop.gfsh
new file mode 100644
index 0000000..7672d87
--- /dev/null
+++ b/durableMessaging/scripts/stop.gfsh
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+connect --locator=127.0.0.1[10334]
+shutdown --include-locators=true
diff --git a/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java b/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java
new file mode 100644
index 0000000..25b96b8
--- /dev/null
+++ b/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode_examples.durableMessaging;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+
+public class Example {
+  private static final int numEvents = 10;
+  private static final CountDownLatch waitForEventsLatch = new CountDownLatch(numEvents);
+
+  public static void main(String[] args) throws Exception {
+    ClientCache clientCacheOne = createDurableClient();
+
+    final String regionName = "example-region";
+
+    // Create a local caching proxy region that matches the server region
+    ClientRegionFactory<Integer, String> clientOneRegionFactory =
+        clientCacheOne.createClientRegionFactory(ClientRegionShortcut.PROXY);
+    Region<Integer, String> exampleClientRegionOne = clientOneRegionFactory.create(regionName);
+
+    // Register interest to create the durable client message queue
+    exampleClientRegionOne.registerInterestForAllKeys(InterestResultPolicy.DEFAULT, true);
+
+    // Close the client cache with keepalive set to true so
+    // the durable client messages are preserved
+    // for the duration of the configured timeout. In practice,
+    // it is more likely the client would disconnect
+    // due to a temporary network issue, but for this example the cache is explicitly closed.
+    clientCacheOne.close(true);
+
+    // Create a second client to do puts with while the first client is disconnected
+    ClientCache clientCacheTwo = new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334)
+        .set("log-level", "WARN").create();
+
+    ClientRegionFactory<Integer, String> clientTwoRegionFactory =
+        clientCacheTwo.createClientRegionFactory(ClientRegionShortcut.PROXY);
+    Region<Integer, String> exampleClientRegionTwo = clientTwoRegionFactory.create(regionName);
+
+    for (int i = 0; i < numEvents; ++i) {
+      exampleClientRegionTwo.put(i, "testValue" + i);
+    }
+
+    // Close the second client and restart the durable client
+    clientCacheTwo.close(false);
+
+    clientCacheOne = createDurableClient();
+
+    // Add an example cache listener so this client can react
+    // when the server sends this client's events from the
+    // durable message queue. This isn't required but helps
+    // illustrate that the events are delivered successfully.
+    clientOneRegionFactory = clientCacheOne.createClientRegionFactory(ClientRegionShortcut.PROXY);
+    exampleClientRegionOne = clientOneRegionFactory
+        .addCacheListener(new ExampleCacheListener<Integer, String>()).create(regionName);
+
+    // Signal to the server that this client is ready to receive events.
+    // Events in this client's durable message queue
+    // will then be delivered and trigger our example cache listener.
+    clientCacheOne.readyForEvents();
+
+    // Use a count down latch to ensure that this client receives all queued events from the server
+    waitForEventsLatch.await();
+  }
+
+  private static ClientCache createDurableClient() {
+    return new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334)
+        // Provide a unique identifier for this client's durable subscription message queue
+        .set(DURABLE_CLIENT_ID, "1")
+        // Provide a timeout in seconds for how long the server will wait for the client to
+        // reconnect.
+        // If this property isn't set explicitly, it defaults to 300 seconds.
+        .set(DURABLE_CLIENT_TIMEOUT, "200")
+        // This is required so the client can register interest for all keys on this durable client
+        .setPoolSubscriptionEnabled(true).set(LOG_LEVEL, "WARN").create();
+  }
+
+  public static class ExampleCacheListener<Integer, String>
+      extends CacheListenerAdapter<Integer, String> {
+    public ExampleCacheListener() {}
+
+    @Override
+    public void afterCreate(EntryEvent<Integer, String> event) {
+      System.out.println(
+          "Received create for key " + event.getKey() + " after durable client reconnection");
+      waitForEventsLatch.countDown();
+    }
+  }
+}
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index bc0c2ab..90d09db 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -63,6 +63,7 @@ rat {
 
     // working directories
     '**/locator/**',
+    '**/server/**',
     '**/server1/**',
     '**/server2/**',
     '**/locator-ln/**',
diff --git a/settings.gradle b/settings.gradle
index c283d8c..18a22a4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,6 +22,7 @@ include 'queries'
 include 'lucene'
 include 'loader'
 include 'putall'
+include 'durableMessaging'
 include 'cq'
 include 'clientSecurity'
 include 'functions'