You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2019/01/14 18:09:28 UTC
[geode-examples] branch develop updated: GEODE-5806: Adding example
for durable messaging in client subscriptions (#69)
This is an automated email from the ASF dual-hosted git repository.
mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-examples.git
The following commit(s) were added to refs/heads/develop by this push:
new a2b7afa GEODE-5806: Adding example for durable messaging in client subscriptions (#69)
a2b7afa is described below
commit a2b7afada61030e3d05f76be90687e673c0c8e03
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Mon Jan 14 10:09:24 2019 -0800
GEODE-5806: Adding example for durable messaging in client subscriptions (#69)
---
README.md | 4 +-
durableMessaging/README.md | 51 ++++++++++
durableMessaging/scripts/start.gfsh | 24 +++++
durableMessaging/scripts/stop.gfsh | 18 ++++
.../geode_examples/durableMessaging/Example.java | 113 +++++++++++++++++++++
gradle/rat.gradle | 1 +
settings.gradle | 1 +
7 files changed, 210 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index 804c3d1..9cbee7a 100644
--- a/README.md
+++ b/README.md
@@ -86,8 +86,8 @@ tutorial.
### Advanced
* [Lucene Spatial Indexing](luceneSpatial/README.md)
-* WAN Gateway
-* Durable subscriptions
+* [WAN Gateway](wan/README.md)
+* [Durable Messaging for Subscriptions](durableMessaging/README.md)
* Delta propagation
* Network partition detection
* D-lock
diff --git a/durableMessaging/README.md b/durableMessaging/README.md
new file mode 100644
index 0000000..f80a13d
--- /dev/null
+++ b/durableMessaging/README.md
@@ -0,0 +1,51 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Geode Durable Messaging Example
+
+This example demonstrates Apache Geode's Durable Messaging feature.
+Use durable messaging for subscriptions that you need maintained for your clients even when your clients are down or disconnected.
+You can configure any of your event subscriptions as durable. Events for durable queries and subscriptions are saved in a queue when the client
+is disconnected and played back when the client reconnects. Other queries and subscriptions are removed from the queue.
+
+The example performs the following tasks to demonstrate durable messaging:
+
+1. Create a client cache with durable messaging enabled
+2. Register interest in all keys in the example region with durable messaging enabled
+3. Close the client cache, simulating a disconnection
+4. Start a second client, and do puts while the first client is down
+5. Restart the first client, and observe that the create events in the durable queue are delivered. A simple cache listener is used to print output to the terminal as create events are received. If interested, see [Cache Listeners](listener/README.md) for more details on how cache listeners work.
+
+This example assumes you have installed Java and Geode.
+
+## Steps
+
+1. From the `geode-examples/durableMessaging` directory, build the example.
+
+ $ ../gradlew build
+
+2. Next start a locator, start a server, and create a region.
+
+ $ gfsh run --file=scripts/start.gfsh
+
+3. Run the example to demonstrate durable messaging.
+
+ $ ../gradlew run
+
+4. Shut down the server.
+
+ $ gfsh run --file=scripts/stop.gfsh
diff --git a/durableMessaging/scripts/start.gfsh b/durableMessaging/scripts/start.gfsh
new file mode 100644
index 0000000..4c845bc
--- /dev/null
+++ b/durableMessaging/scripts/start.gfsh
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+start locator --name=locator --bind-address=127.0.0.1
+
+start server --name=server --locators=127.0.0.1[10334] --server-port=0
+
+list members
+
+create region --name=example-region --type=REPLICATE
+describe region --name=example-region
diff --git a/durableMessaging/scripts/stop.gfsh b/durableMessaging/scripts/stop.gfsh
new file mode 100644
index 0000000..7672d87
--- /dev/null
+++ b/durableMessaging/scripts/stop.gfsh
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+connect --locator=127.0.0.1[10334]
+shutdown --include-locators=true
diff --git a/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java b/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java
new file mode 100644
index 0000000..25b96b8
--- /dev/null
+++ b/durableMessaging/src/main/java/org/apache/geode_examples/durableMessaging/Example.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode_examples.durableMessaging;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_TIMEOUT;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+
+public class Example {
+ private static final int numEvents = 10;
+ private static final CountDownLatch waitForEventsLatch = new CountDownLatch(numEvents);
+
+ public static void main(String[] args) throws Exception {
+ ClientCache clientCacheOne = createDurableClient();
+
+ final String regionName = "example-region";
+
+ // Create a local caching proxy region that matches the server region
+ ClientRegionFactory<Integer, String> clientOneRegionFactory =
+ clientCacheOne.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region<Integer, String> exampleClientRegionOne = clientOneRegionFactory.create(regionName);
+
+ // Register interest to create the durable client message queue
+ exampleClientRegionOne.registerInterestForAllKeys(InterestResultPolicy.DEFAULT, true);
+
+ // Close the client cache with keepalive set to true so
+ // the durable client messages are preserved
+ // for the duration of the configured timeout. In practice,
+ // it is more likely the client would disconnect
+ // due to a temporary network issue, but for this example the cache is explicitly closed.
+ clientCacheOne.close(true);
+
+ // Create a second client to do puts with while the first client is disconnected
+ ClientCache clientCacheTwo = new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334)
+ .set("log-level", "WARN").create();
+
+ ClientRegionFactory<Integer, String> clientTwoRegionFactory =
+ clientCacheTwo.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region<Integer, String> exampleClientRegionTwo = clientTwoRegionFactory.create(regionName);
+
+ for (int i = 0; i < numEvents; ++i) {
+ exampleClientRegionTwo.put(i, "testValue" + i);
+ }
+
+ // Close the second client and restart the durable client
+ clientCacheTwo.close(false);
+
+ clientCacheOne = createDurableClient();
+
+ // Add an example cache listener so this client can react
+ // when the server sends this client's events from the
+ // durable message queue. This isn't required but helps
+ // illustrate that the events are delivered successfully.
+ clientOneRegionFactory = clientCacheOne.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ exampleClientRegionOne = clientOneRegionFactory
+ .addCacheListener(new ExampleCacheListener<Integer, String>()).create(regionName);
+
+ // Signal to the server that this client is ready to receive events.
+ // Events in this client's durable message queue
+ // will then be delivered and trigger our example cache listener.
+ clientCacheOne.readyForEvents();
+
+ // Use a count down latch to ensure that this client receives all queued events from the server
+ waitForEventsLatch.await();
+ }
+
+ private static ClientCache createDurableClient() {
+ return new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334)
+ // Provide a unique identifier for this client's durable subscription message queue
+ .set(DURABLE_CLIENT_ID, "1")
+ // Provide a timeout in seconds for how long the server will wait for the client to
+ // reconnect.
+ // If this property isn't set explicitly, it defaults to 300 seconds.
+ .set(DURABLE_CLIENT_TIMEOUT, "200")
+ // This is required so the client can register interest for all keys on this durable client
+ .setPoolSubscriptionEnabled(true).set(LOG_LEVEL, "WARN").create();
+ }
+
+ public static class ExampleCacheListener<Integer, String>
+ extends CacheListenerAdapter<Integer, String> {
+ public ExampleCacheListener() {}
+
+ @Override
+ public void afterCreate(EntryEvent<Integer, String> event) {
+ System.out.println(
+ "Received create for key " + event.getKey() + " after durable client reconnection");
+ waitForEventsLatch.countDown();
+ }
+ }
+}
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index bc0c2ab..90d09db 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -63,6 +63,7 @@ rat {
// working directories
'**/locator/**',
+ '**/server/**',
'**/server1/**',
'**/server2/**',
'**/locator-ln/**',
diff --git a/settings.gradle b/settings.gradle
index c283d8c..18a22a4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,6 +22,7 @@ include 'queries'
include 'lucene'
include 'loader'
include 'putall'
+include 'durableMessaging'
include 'cq'
include 'clientSecurity'
include 'functions'