You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by db...@apache.org on 2018/06/20 16:48:45 UTC

[geode-examples] branch master updated: Adding a CQ Example (#58)

This is an automated email from the ASF dual-hosted git repository.

dbarnes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-examples.git


The following commit(s) were added to refs/heads/master by this push:
     new 46f8ff6  Adding a CQ Example (#58)
46f8ff6 is described below

commit 46f8ff679f8dd699291ced104dad11cedd594105
Author: Addison Huddy <ad...@gmail.com>
AuthorDate: Wed Jun 20 09:48:43 2018 -0700

    Adding a CQ Example (#58)
    
    * Adding a CQ Example, reformatting functions directory
---
 build.gradle                                       | 219 +++++++++++----------
 cq/README.md                                       |  53 +++++
 cq/scripts/start.gfsh                              |  24 +++
 cq/scripts/stop.gfsh                               |  19 ++
 .../java/org/apache/geode_examples/cq/Example.java | 129 ++++++++++++
 .../geode_examples/cq/RandomEventListener.java     |  47 +++++
 .../functions/ExampleTest.java                     |   6 +-
 gradle/wrapper/gradle-wrapper.properties           |   4 +-
 .../queries/EmployeeData.java                      |   0
 .../queries/Example.java                           |   0
 settings.gradle                                    |   1 +
 11 files changed, 389 insertions(+), 113 deletions(-)

diff --git a/build.gradle b/build.gradle
index d148d2c..e8e1db6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -16,136 +16,139 @@
  */
 
 plugins {
-  id "org.nosphere.apache.rat" version "0.3.0"
-  id "com.diffplug.gradle.spotless" version "3.0.0"
-  id "de.undercouch.download" version "3.1.2"
+    id "org.nosphere.apache.rat" version "0.3.0"
+    id "com.diffplug.gradle.spotless" version "3.0.0"
+    id "de.undercouch.download" version "3.1.2"
 }
 
 allprojects {
-  repositories {
-    if (geodeRepositoryUrl != "") {
-      maven {
-        url geodeRepositoryUrl
-      }
+    repositories {
+        if (geodeRepositoryUrl != "") {
+            maven {
+                url geodeRepositoryUrl
+            }
+        }
+        mavenCentral()
+        maven {
+            url 'http://repository.apache.org/snapshots'
+        }
     }
-    mavenCentral()
-    maven {
-      url 'http://repository.apache.org/snapshots'
-    }
-  }
 }
 
 def installDir = "$buildDir/apache-geode-${geodeVersion}"
 
 configurations {
-  geodeDistribution
+    geodeDistribution
 }
 
 dependencies {
-  geodeDistribution "org.apache.geode:apache-geode:$geodeVersion"
+    geodeDistribution "org.apache.geode:apache-geode:$geodeVersion"
+
 }
 
 task installGeode(type: Copy) {
-  from zipTree(configurations.geodeDistribution.singleFile)
-  into buildDir
+    from zipTree(configurations.geodeDistribution.singleFile)
+    into buildDir
 }
 
 subprojects {
-  apply plugin:'java'
-
-  dependencies {
-    compile "org.apache.geode:geode-core:$geodeVersion"
-
-    testCompile "com.jayway.awaitility:awaitility:$awaitilityVersion"
-    testCompile "junit:junit:$junitVersion"
-    testCompile "org.mockito:mockito-core:$mockitocoreVersion"
-    testCompile "com.github.stefanbirkner:system-rules:$systemrulesVersion"
-    testCompile "org.assertj:assertj-core:$assertjVersion"
-    compile "org.apache.logging.log4j:log4j-core:$log4jVersion"
-    runtime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
-  }
-
-  jar {
-    archiveName "${baseName}.${extension}"
-  }
-  
-  task cleanServer {
-    doLast {
-      delete 'locator'
-      delete 'server1'
-      delete 'server2'
+    apply plugin: 'java'
+
+    dependencies {
+        compile "org.apache.geode:geode-core:$geodeVersion"
+        compile "org.apache.geode:geode-cq:$geodeVersion"
+        compile 'com.google.guava:guava:25.1-jre'
+
+        testCompile "com.jayway.awaitility:awaitility:$awaitilityVersion"
+        testCompile "junit:junit:$junitVersion"
+        testCompile "org.mockito:mockito-core:$mockitocoreVersion"
+        testCompile "com.github.stefanbirkner:system-rules:$systemrulesVersion"
+        testCompile "org.assertj:assertj-core:$assertjVersion"
+        compile "org.apache.logging.log4j:log4j-core:$log4jVersion"
+        runtime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
     }
-  }
-  clean.finalizedBy cleanServer
-
-  def geodePath = "${System.env.PATH}${System.getProperty('path.separator')}${installDir}/bin"
-  task start(type: Exec, dependsOn: [installGeode, build, cleanServer]) {
-    workingDir projectDir
-    environment 'GEODE_HOME', installDir
-    environment 'PATH', geodePath
-    commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/start.gfsh"
-  }
-
-  task stop(type: Exec, dependsOn: installGeode) {
-    workingDir projectDir
-    environment 'GEODE_HOME', installDir
-    environment 'PATH', geodePath
-    commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/stop.gfsh"
-  }
-
-  task run(type: JavaExec, dependsOn: build) {
-    description = 'Run example'
-    classpath = sourceSets.main.runtimeClasspath
-    main = "org.apache.geode_examples.${project.name}.Example"
-  }
-
-  task waitForExitingMembers(type: Exec) {
-    workingDir projectDir
-    environment 'GEODE_HOME', installDir
-    environment 'PATH', geodePath
-    ignoreExitValue true
-    commandLine 'sh', '-c', "" +
-            "TIMEOUT=120 ;" +
-            "echo \"Waiting at most \$TIMEOUT seconds for all members to shut down...\" ;" +
-            "while pgrep -f \"(Server|Locator)Launcher\" > /dev/null ; do" +
-            "  printf \".\" ; " +
-            "  sleep 1 ;" +
-            "  TIMEOUT=\$((\$TIMEOUT - 1)) ;" +
-            "  if [ \$TIMEOUT -eq 0 ] ; then" +
-            "    echo \"\" ;" +
-            "    exit 10 ;" +
-            "  fi ;" +
-            "done ;" +
-            "echo \"\""
-    doLast {
-      // We use exit code 10 to avoid conflict with pgrep exit codes.
-      if (execResult.exitValue == 10) {
-        throw new GradleException("A member process persisted beyond permitted timeout.  Aborting.")
-      } else if (execResult.exitValue != 0) {
-        throw new GradleException("waitForExistingMembers failed with exit code: " + execResult.exitValue)
-      }
+
+    jar {
+        archiveName "${baseName}.${extension}"
     }
-  }
-
-  task verifyNoMembersRunning(type: Exec) {
-    workingDir projectDir
-    environment 'GEODE_HOME', installDir
-    environment 'PATH', geodePath
-    ignoreExitValue true
-    commandLine 'sh', '-c', "echo \"Looking for existing member processes...\" ; " +
-            "pgrep -f \"(Server|Locator)Launcher\" ; "
-    doLast {
-      if (execResult.exitValue == 0) {
-        throw new GradleException("Existing members detected.  Examples expect a clean environment in which to run.")
-      }
+
+    task cleanServer {
+        doLast {
+            delete 'locator'
+            delete 'server1'
+            delete 'server2'
+        }
+    }
+    clean.finalizedBy cleanServer
+
+    def geodePath = "${System.env.PATH}${System.getProperty('path.separator')}${installDir}/bin"
+    task start(type: Exec, dependsOn: [installGeode, build, cleanServer]) {
+        workingDir projectDir
+        environment 'GEODE_HOME', installDir
+        environment 'PATH', geodePath
+        commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/start.gfsh"
+    }
+
+    task stop(type: Exec, dependsOn: installGeode) {
+        workingDir projectDir
+        environment 'GEODE_HOME', installDir
+        environment 'PATH', geodePath
+        commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/stop.gfsh"
+    }
+
+    task run(type: JavaExec, dependsOn: build) {
+        description = 'Run example'
+        classpath = sourceSets.main.runtimeClasspath
+        main = "org.apache.geode_examples.${project.name}.Example"
+    }
+
+    task waitForExitingMembers(type: Exec) {
+        workingDir projectDir
+        environment 'GEODE_HOME', installDir
+        environment 'PATH', geodePath
+        ignoreExitValue true
+        commandLine 'sh', '-c', "" +
+                "TIMEOUT=120 ;" +
+                "echo \"Waiting at most \$TIMEOUT seconds for all members to shut down...\" ;" +
+                "while pgrep -f \"(Server|Locator)Launcher\" > /dev/null ; do" +
+                "  printf \".\" ; " +
+                "  sleep 1 ;" +
+                "  TIMEOUT=\$((\$TIMEOUT - 1)) ;" +
+                "  if [ \$TIMEOUT -eq 0 ] ; then" +
+                "    echo \"\" ;" +
+                "    exit 10 ;" +
+                "  fi ;" +
+                "done ;" +
+                "echo \"\""
+        doLast {
+            // We use exit code 10 to avoid conflict with pgrep exit codes.
+            if (execResult.exitValue == 10) {
+                throw new GradleException("A member process persisted beyond permitted timeout.  Aborting.")
+            } else if (execResult.exitValue != 0) {
+                throw new GradleException("waitForExistingMembers failed with exit code: " + execResult.exitValue)
+            }
+        }
+    }
+
+    task verifyNoMembersRunning(type: Exec) {
+        workingDir projectDir
+        environment 'GEODE_HOME', installDir
+        environment 'PATH', geodePath
+        ignoreExitValue true
+        commandLine 'sh', '-c', "echo \"Looking for existing member processes...\" ; " +
+                "pgrep -f \"(Server|Locator)Launcher\" ; "
+        doLast {
+            if (execResult.exitValue == 0) {
+                throw new GradleException("Existing members detected.  Examples expect a clean environment in which to run.")
+            }
+        }
     }
-  }
 
-  task runAll(dependsOn: [verifyNoMembersRunning, start, run, stop, waitForExitingMembers])
-  start.mustRunAfter verifyNoMembersRunning
-  run.mustRunAfter start
-  stop.mustRunAfter run
-  waitForExitingMembers.mustRunAfter stop
+    task runAll(dependsOn: [verifyNoMembersRunning, start, run, stop, waitForExitingMembers])
+    start.mustRunAfter verifyNoMembersRunning
+    run.mustRunAfter start
+    stop.mustRunAfter run
+    waitForExitingMembers.mustRunAfter stop
 }
 
 apply from: "gradle/spotless.gradle"
diff --git a/cq/README.md b/cq/README.md
new file mode 100644
index 0000000..823426a
--- /dev/null
+++ b/cq/README.md
@@ -0,0 +1,53 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Geode Continuous Query Example
+
+This is a simple example that demonstrates Apache Geode's Continuous Queries(CQs) feature.  CQs allow clients to subscribe
+to server-side events using a SQL-like query.  When a client registers a CQ, the client will receive all events that
+modify the query results.
+
+In this example, the client program will first register a CQ with the query 
+`SELECT * FROM /example-region i where i > 70`. The region has keys and values that are both Integer types.
+ 
+The program loops, randomly generating two integers to put on the server as the key and value.
+ 
+If a value is either created or updated that is greater than 70, the above CQ will trigger the `RandomEventLister`,
+which prints to stdout.
+
+The client will generate data for 20 seconds, close the CQ and Cache, and then exit.
+
+This example assumes you have installed Java and Geode.
+
+## Steps
+
+1. From the `geode-examples/cq` directory, build the example and
+   run unit tests.
+
+        $ ../gradlew build
+
+2. Next start a locator, start a server, and create a region.
+
+        $ gfsh run --file=scripts/start.gfsh
+
+3. Run the example to demonstrate continues queries. W
+
+        $ ../gradlew run
+
+4. Shut down the system.
+
+        $ gfsh run --file=scripts/stop.gfsh
diff --git a/cq/scripts/start.gfsh b/cq/scripts/start.gfsh
new file mode 100755
index 0000000..71eeeeb
--- /dev/null
+++ b/cq/scripts/start.gfsh
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+start locator --name=locator --bind-address=127.0.0.1
+start server --name=server1 --locators=127.0.0.1[10334] --server-port=0 --classpath=../build/classes/main
+start server --name=server2 --locators=127.0.0.1[10334] --server-port=0 --classpath=../build/classes/main
+list members
+
+create region --name=example-region --type=REPLICATE
+describe region --name=example-region
diff --git a/cq/scripts/stop.gfsh b/cq/scripts/stop.gfsh
new file mode 100755
index 0000000..15cd93c
--- /dev/null
+++ b/cq/scripts/stop.gfsh
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+connect --locator=127.0.0.1[10334]
+shutdown --include-locators=true
diff --git a/cq/src/main/java/org/apache/geode_examples/cq/Example.java b/cq/src/main/java/org/apache/geode_examples/cq/Example.java
new file mode 100644
index 0000000..b510c39
--- /dev/null
+++ b/cq/src/main/java/org/apache/geode_examples/cq/Example.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode_examples.cq;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+
+
+public class Example {
+
+  private ClientCache cache;
+  private Region<Integer, Integer> region;
+  private CqQuery randomTracker;
+
+  private void init() throws CqException, RegionNotFoundException, CqExistsException {
+    // init cache, region, and CQ
+
+    // connect to the locator using default port 10334
+    this.cache = connectToLocallyRunningGeodge();
+
+    // create a local region that matches the server region
+    this.region = cache.<Integer, Integer>createClientRegionFactory(ClientRegionShortcut.PROXY)
+        .create("example-region");
+
+    this.randomTracker = this.startCQ(this.cache, this.region);
+  }
+
+  private void run() throws InterruptedException {
+
+    this.startPuttingData(this.region);
+
+  }
+
+  private void close() throws CqException {
+
+    // close the CQ and Cache
+    this.randomTracker.close();
+    this.cache.close();
+
+  }
+
+
+  public static void main(String[] args) throws Exception {
+
+    Example mExample = new Example();
+
+    mExample.init();
+
+    mExample.run();
+
+    mExample.close();
+
+    System.out.println("\n---- So that is CQ's----\n");
+
+  }
+
+  private CqQuery startCQ(ClientCache cache, Region region)
+      throws CqException, RegionNotFoundException, CqExistsException {
+    // Get cache and queryService - refs to local cache and QueryService
+
+    CqAttributesFactory cqf = new CqAttributesFactory();
+    cqf.addCqListener(new RandomEventListener());
+    CqAttributes cqa = cqf.create();
+
+    String cqName = "randomTracker";
+
+    String queryStr = "SELECT * FROM /example-region i where i > 70";
+
+    QueryService queryService = region.getRegionService().getQueryService();
+    CqQuery randomTracker = queryService.newCq(cqName, queryStr, cqa);
+    randomTracker.execute();
+
+
+    System.out.println("------- CQ is running\n");
+
+    return randomTracker;
+  }
+
+  private void startPuttingData(Region region) throws InterruptedException {
+
+    // Example will run for 20 second
+
+    Stopwatch stopWatch = Stopwatch.createStarted();
+
+    while (stopWatch.elapsed(TimeUnit.SECONDS) < 20) {
+
+      // 500ms delay to make this easier to follow
+      Thread.sleep(500);
+      int randomKey = ThreadLocalRandom.current().nextInt(0, 99 + 1);
+      int randomValue = ThreadLocalRandom.current().nextInt(0, 100 + 1);
+      region.put(randomKey, randomValue);
+      System.out.println("Key: " + randomKey + "     Value: " + randomValue);
+
+    }
+
+    stopWatch.stop();
+
+  }
+
+
+  private ClientCache connectToLocallyRunningGeodge() {
+    ClientCache cache = new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334)
+        .setPoolSubscriptionEnabled(true).set("log-level", "WARN").create();
+
+    return cache;
+  }
+
+}
diff --git a/cq/src/main/java/org/apache/geode_examples/cq/RandomEventListener.java b/cq/src/main/java/org/apache/geode_examples/cq/RandomEventListener.java
new file mode 100644
index 0000000..ee2e802
--- /dev/null
+++ b/cq/src/main/java/org/apache/geode_examples/cq/RandomEventListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode_examples.cq;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+
+public class RandomEventListener implements CqListener {
+
+  @Override
+  public void onEvent(CqEvent cqEvent) {
+
+    Operation queryOperation = cqEvent.getQueryOperation();
+
+
+    if (queryOperation.isUpdate()) {
+      System.out.print("-------Updated Value\n");
+    } else if (queryOperation.isCreate()) {
+      System.out.print("-------Value Created\n");
+    }
+  }
+
+  @Override
+  public void onError(CqEvent cqEvent) {
+    System.out.print("**Something bad happened**");
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+}
diff --git a/functions/src/test/java/org/apache/geode/examples/functions/ExampleTest.java b/functions/src/test/java/org/apache/geode_examples/functions/ExampleTest.java
similarity index 97%
rename from functions/src/test/java/org/apache/geode/examples/functions/ExampleTest.java
rename to functions/src/test/java/org/apache/geode_examples/functions/ExampleTest.java
index da2f4a0..e34e0cf 100644
--- a/functions/src/test/java/org/apache/geode/examples/functions/ExampleTest.java
+++ b/functions/src/test/java/org/apache/geode_examples/functions/ExampleTest.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode_examples.functions;
 
-import static org.junit.Assert.assertEquals;
+import static org.jgroups.util.Util.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -22,12 +22,12 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 
+import org.junit.Test;
+
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.ResultCollector;
 
-import org.junit.Test;
-
 public class ExampleTest {
   @Test
   public void testExample() throws Exception {
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 211d62b..5ecb5df 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Mon Jun 20 23:30:08 PDT 2016
+#Tue Jun 12 15:19:00 PDT 2018
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
 zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-3.3-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-3.3-all.zip
diff --git a/queries/src/main/java/org/apache/geode/examples/queries/EmployeeData.java b/queries/src/main/java/org/apache/geode_examples/queries/EmployeeData.java
similarity index 100%
rename from queries/src/main/java/org/apache/geode/examples/queries/EmployeeData.java
rename to queries/src/main/java/org/apache/geode_examples/queries/EmployeeData.java
diff --git a/queries/src/main/java/org/apache/geode/examples/queries/Example.java b/queries/src/main/java/org/apache/geode_examples/queries/Example.java
similarity index 100%
rename from queries/src/main/java/org/apache/geode/examples/queries/Example.java
rename to queries/src/main/java/org/apache/geode_examples/queries/Example.java
diff --git a/settings.gradle b/settings.gradle
index 86ebb89..d5857ee 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,6 +22,7 @@ include 'queries'
 include 'lucene'
 include 'loader'
 include 'putall'
+include 'cq'
 include 'clientSecurity'
 include 'functions'
 include 'persistence'