You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by db...@apache.org on 2018/06/20 16:48:45 UTC
[geode-examples] branch master updated: Adding a CQ Example (#58)
This is an automated email from the ASF dual-hosted git repository.
dbarnes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geode-examples.git
The following commit(s) were added to refs/heads/master by this push:
new 46f8ff6 Adding a CQ Example (#58)
46f8ff6 is described below
commit 46f8ff679f8dd699291ced104dad11cedd594105
Author: Addison Huddy <ad...@gmail.com>
AuthorDate: Wed Jun 20 09:48:43 2018 -0700
Adding a CQ Example (#58)
* Adding a CQ Example, reformatting functions directory
---
build.gradle | 219 +++++++++++----------
cq/README.md | 53 +++++
cq/scripts/start.gfsh | 24 +++
cq/scripts/stop.gfsh | 19 ++
.../java/org/apache/geode_examples/cq/Example.java | 129 ++++++++++++
.../geode_examples/cq/RandomEventListener.java | 47 +++++
.../functions/ExampleTest.java | 6 +-
gradle/wrapper/gradle-wrapper.properties | 4 +-
.../queries/EmployeeData.java | 0
.../queries/Example.java | 0
settings.gradle | 1 +
11 files changed, 389 insertions(+), 113 deletions(-)
diff --git a/build.gradle b/build.gradle
index d148d2c..e8e1db6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -16,136 +16,139 @@
*/
plugins {
- id "org.nosphere.apache.rat" version "0.3.0"
- id "com.diffplug.gradle.spotless" version "3.0.0"
- id "de.undercouch.download" version "3.1.2"
+ id "org.nosphere.apache.rat" version "0.3.0"
+ id "com.diffplug.gradle.spotless" version "3.0.0"
+ id "de.undercouch.download" version "3.1.2"
}
allprojects {
- repositories {
- if (geodeRepositoryUrl != "") {
- maven {
- url geodeRepositoryUrl
- }
+ repositories {
+ if (geodeRepositoryUrl != "") {
+ maven {
+ url geodeRepositoryUrl
+ }
+ }
+ mavenCentral()
+ maven {
+ url 'http://repository.apache.org/snapshots'
+ }
}
- mavenCentral()
- maven {
- url 'http://repository.apache.org/snapshots'
- }
- }
}
def installDir = "$buildDir/apache-geode-${geodeVersion}"
configurations {
- geodeDistribution
+ geodeDistribution
}
dependencies {
- geodeDistribution "org.apache.geode:apache-geode:$geodeVersion"
+ geodeDistribution "org.apache.geode:apache-geode:$geodeVersion"
+
}
task installGeode(type: Copy) {
- from zipTree(configurations.geodeDistribution.singleFile)
- into buildDir
+ from zipTree(configurations.geodeDistribution.singleFile)
+ into buildDir
}
subprojects {
- apply plugin:'java'
-
- dependencies {
- compile "org.apache.geode:geode-core:$geodeVersion"
-
- testCompile "com.jayway.awaitility:awaitility:$awaitilityVersion"
- testCompile "junit:junit:$junitVersion"
- testCompile "org.mockito:mockito-core:$mockitocoreVersion"
- testCompile "com.github.stefanbirkner:system-rules:$systemrulesVersion"
- testCompile "org.assertj:assertj-core:$assertjVersion"
- compile "org.apache.logging.log4j:log4j-core:$log4jVersion"
- runtime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
- }
-
- jar {
- archiveName "${baseName}.${extension}"
- }
-
- task cleanServer {
- doLast {
- delete 'locator'
- delete 'server1'
- delete 'server2'
+ apply plugin: 'java'
+
+ dependencies {
+ compile "org.apache.geode:geode-core:$geodeVersion"
+ compile "org.apache.geode:geode-cq:$geodeVersion"
+ compile 'com.google.guava:guava:25.1-jre'
+
+ testCompile "com.jayway.awaitility:awaitility:$awaitilityVersion"
+ testCompile "junit:junit:$junitVersion"
+ testCompile "org.mockito:mockito-core:$mockitocoreVersion"
+ testCompile "com.github.stefanbirkner:system-rules:$systemrulesVersion"
+ testCompile "org.assertj:assertj-core:$assertjVersion"
+ compile "org.apache.logging.log4j:log4j-core:$log4jVersion"
+ runtime "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
}
- }
- clean.finalizedBy cleanServer
-
- def geodePath = "${System.env.PATH}${System.getProperty('path.separator')}${installDir}/bin"
- task start(type: Exec, dependsOn: [installGeode, build, cleanServer]) {
- workingDir projectDir
- environment 'GEODE_HOME', installDir
- environment 'PATH', geodePath
- commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/start.gfsh"
- }
-
- task stop(type: Exec, dependsOn: installGeode) {
- workingDir projectDir
- environment 'GEODE_HOME', installDir
- environment 'PATH', geodePath
- commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/stop.gfsh"
- }
-
- task run(type: JavaExec, dependsOn: build) {
- description = 'Run example'
- classpath = sourceSets.main.runtimeClasspath
- main = "org.apache.geode_examples.${project.name}.Example"
- }
-
- task waitForExitingMembers(type: Exec) {
- workingDir projectDir
- environment 'GEODE_HOME', installDir
- environment 'PATH', geodePath
- ignoreExitValue true
- commandLine 'sh', '-c', "" +
- "TIMEOUT=120 ;" +
- "echo \"Waiting at most \$TIMEOUT seconds for all members to shut down...\" ;" +
- "while pgrep -f \"(Server|Locator)Launcher\" > /dev/null ; do" +
- " printf \".\" ; " +
- " sleep 1 ;" +
- " TIMEOUT=\$((\$TIMEOUT - 1)) ;" +
- " if [ \$TIMEOUT -eq 0 ] ; then" +
- " echo \"\" ;" +
- " exit 10 ;" +
- " fi ;" +
- "done ;" +
- "echo \"\""
- doLast {
- // We use exit code 10 to avoid conflict with pgrep exit codes.
- if (execResult.exitValue == 10) {
- throw new GradleException("A member process persisted beyond permitted timeout. Aborting.")
- } else if (execResult.exitValue != 0) {
- throw new GradleException("waitForExistingMembers failed with exit code: " + execResult.exitValue)
- }
+
+ jar {
+ archiveName "${baseName}.${extension}"
}
- }
-
- task verifyNoMembersRunning(type: Exec) {
- workingDir projectDir
- environment 'GEODE_HOME', installDir
- environment 'PATH', geodePath
- ignoreExitValue true
- commandLine 'sh', '-c', "echo \"Looking for existing member processes...\" ; " +
- "pgrep -f \"(Server|Locator)Launcher\" ; "
- doLast {
- if (execResult.exitValue == 0) {
- throw new GradleException("Existing members detected. Examples expect a clean environment in which to run.")
- }
+
+ task cleanServer {
+ doLast {
+ delete 'locator'
+ delete 'server1'
+ delete 'server2'
+ }
+ }
+ clean.finalizedBy cleanServer
+
+ def geodePath = "${System.env.PATH}${System.getProperty('path.separator')}${installDir}/bin"
+ task start(type: Exec, dependsOn: [installGeode, build, cleanServer]) {
+ workingDir projectDir
+ environment 'GEODE_HOME', installDir
+ environment 'PATH', geodePath
+ commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/start.gfsh"
+ }
+
+ task stop(type: Exec, dependsOn: installGeode) {
+ workingDir projectDir
+ environment 'GEODE_HOME', installDir
+ environment 'PATH', geodePath
+ commandLine 'sh', '-c', "gfsh run --file=${projectDir}/scripts/stop.gfsh"
+ }
+
+ task run(type: JavaExec, dependsOn: build) {
+ description = 'Run example'
+ classpath = sourceSets.main.runtimeClasspath
+ main = "org.apache.geode_examples.${project.name}.Example"
+ }
+
+ task waitForExitingMembers(type: Exec) {
+ workingDir projectDir
+ environment 'GEODE_HOME', installDir
+ environment 'PATH', geodePath
+ ignoreExitValue true
+ commandLine 'sh', '-c', "" +
+ "TIMEOUT=120 ;" +
+ "echo \"Waiting at most \$TIMEOUT seconds for all members to shut down...\" ;" +
+ "while pgrep -f \"(Server|Locator)Launcher\" > /dev/null ; do" +
+ " printf \".\" ; " +
+ " sleep 1 ;" +
+ " TIMEOUT=\$((\$TIMEOUT - 1)) ;" +
+ " if [ \$TIMEOUT -eq 0 ] ; then" +
+ " echo \"\" ;" +
+ " exit 10 ;" +
+ " fi ;" +
+ "done ;" +
+ "echo \"\""
+ doLast {
+ // We use exit code 10 to avoid conflict with pgrep exit codes.
+ if (execResult.exitValue == 10) {
+ throw new GradleException("A member process persisted beyond permitted timeout. Aborting.")
+ } else if (execResult.exitValue != 0) {
+ throw new GradleException("waitForExistingMembers failed with exit code: " + execResult.exitValue)
+ }
+ }
+ }
+
+ task verifyNoMembersRunning(type: Exec) {
+ workingDir projectDir
+ environment 'GEODE_HOME', installDir
+ environment 'PATH', geodePath
+ ignoreExitValue true
+ commandLine 'sh', '-c', "echo \"Looking for existing member processes...\" ; " +
+ "pgrep -f \"(Server|Locator)Launcher\" ; "
+ doLast {
+ if (execResult.exitValue == 0) {
+ throw new GradleException("Existing members detected. Examples expect a clean environment in which to run.")
+ }
+ }
}
- }
- task runAll(dependsOn: [verifyNoMembersRunning, start, run, stop, waitForExitingMembers])
- start.mustRunAfter verifyNoMembersRunning
- run.mustRunAfter start
- stop.mustRunAfter run
- waitForExitingMembers.mustRunAfter stop
+ task runAll(dependsOn: [verifyNoMembersRunning, start, run, stop, waitForExitingMembers])
+ start.mustRunAfter verifyNoMembersRunning
+ run.mustRunAfter start
+ stop.mustRunAfter run
+ waitForExitingMembers.mustRunAfter stop
}
apply from: "gradle/spotless.gradle"
diff --git a/cq/README.md b/cq/README.md
new file mode 100644
index 0000000..823426a
--- /dev/null
+++ b/cq/README.md
@@ -0,0 +1,53 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Geode Continuous Query Example
+
+This is a simple example that demonstrates Apache Geode's Continuous Queries(CQs) feature. CQs allow clients to subscribe
+to server-side events using a SQL-like query. When a client registers a CQ, the client will receive all events that
+modify the query results.
+
+In this example, the client program will first register a CQ with the query
+`SELECT * FROM /example-region i where i > 70`. The region has keys and values that are both Integer types.
+
+The program loops, randomly generating two integers to put on the server as the key and value.
+
+If a value is either created or updated that is greater than 70, the above CQ will trigger the `RandomEventLister`,
+which prints to stdout.
+
+The client will generate data for 20 seconds, close the CQ and Cache, and then exit.
+
+This example assumes you have installed Java and Geode.
+
+## Steps
+
+1. From the `geode-examples/cq` directory, build the example and
+ run unit tests.
+
+ $ ../gradlew build
+
+2. Next start a locator, start a server, and create a region.
+
+ $ gfsh run --file=scripts/start.gfsh
+
+3. Run the example to demonstrate continues queries. W
+
+ $ ../gradlew run
+
+4. Shut down the system.
+
+ $ gfsh run --file=scripts/stop.gfsh
diff --git a/cq/scripts/start.gfsh b/cq/scripts/start.gfsh
new file mode 100755
index 0000000..71eeeeb
--- /dev/null
+++ b/cq/scripts/start.gfsh
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+start locator --name=locator --bind-address=127.0.0.1
+start server --name=server1 --locators=127.0.0.1[10334] --server-port=0 --classpath=../build/classes/main
+start server --name=server2 --locators=127.0.0.1[10334] --server-port=0 --classpath=../build/classes/main
+list members
+
+create region --name=example-region --type=REPLICATE
+describe region --name=example-region
diff --git a/cq/scripts/stop.gfsh b/cq/scripts/stop.gfsh
new file mode 100755
index 0000000..15cd93c
--- /dev/null
+++ b/cq/scripts/stop.gfsh
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+connect --locator=127.0.0.1[10334]
+shutdown --include-locators=true
diff --git a/cq/src/main/java/org/apache/geode_examples/cq/Example.java b/cq/src/main/java/org/apache/geode_examples/cq/Example.java
new file mode 100644
index 0000000..b510c39
--- /dev/null
+++ b/cq/src/main/java/org/apache/geode_examples/cq/Example.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode_examples.cq;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+
+
+public class Example {
+
+ private ClientCache cache;
+ private Region<Integer, Integer> region;
+ private CqQuery randomTracker;
+
+ private void init() throws CqException, RegionNotFoundException, CqExistsException {
+ // init cache, region, and CQ
+
+ // connect to the locator using default port 10334
+ this.cache = connectToLocallyRunningGeodge();
+
+ // create a local region that matches the server region
+ this.region = cache.<Integer, Integer>createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create("example-region");
+
+ this.randomTracker = this.startCQ(this.cache, this.region);
+ }
+
+ private void run() throws InterruptedException {
+
+ this.startPuttingData(this.region);
+
+ }
+
+ private void close() throws CqException {
+
+ // close the CQ and Cache
+ this.randomTracker.close();
+ this.cache.close();
+
+ }
+
+
+ public static void main(String[] args) throws Exception {
+
+ Example mExample = new Example();
+
+ mExample.init();
+
+ mExample.run();
+
+ mExample.close();
+
+ System.out.println("\n---- So that is CQ's----\n");
+
+ }
+
+ private CqQuery startCQ(ClientCache cache, Region region)
+ throws CqException, RegionNotFoundException, CqExistsException {
+ // Get cache and queryService - refs to local cache and QueryService
+
+ CqAttributesFactory cqf = new CqAttributesFactory();
+ cqf.addCqListener(new RandomEventListener());
+ CqAttributes cqa = cqf.create();
+
+ String cqName = "randomTracker";
+
+ String queryStr = "SELECT * FROM /example-region i where i > 70";
+
+ QueryService queryService = region.getRegionService().getQueryService();
+ CqQuery randomTracker = queryService.newCq(cqName, queryStr, cqa);
+ randomTracker.execute();
+
+
+ System.out.println("------- CQ is running\n");
+
+ return randomTracker;
+ }
+
+ private void startPuttingData(Region region) throws InterruptedException {
+
+ // Example will run for 20 second
+
+ Stopwatch stopWatch = Stopwatch.createStarted();
+
+ while (stopWatch.elapsed(TimeUnit.SECONDS) < 20) {
+
+ // 500ms delay to make this easier to follow
+ Thread.sleep(500);
+ int randomKey = ThreadLocalRandom.current().nextInt(0, 99 + 1);
+ int randomValue = ThreadLocalRandom.current().nextInt(0, 100 + 1);
+ region.put(randomKey, randomValue);
+ System.out.println("Key: " + randomKey + " Value: " + randomValue);
+
+ }
+
+ stopWatch.stop();
+
+ }
+
+
+ private ClientCache connectToLocallyRunningGeodge() {
+ ClientCache cache = new ClientCacheFactory().addPoolLocator("127.0.0.1", 10334)
+ .setPoolSubscriptionEnabled(true).set("log-level", "WARN").create();
+
+ return cache;
+ }
+
+}
diff --git a/cq/src/main/java/org/apache/geode_examples/cq/RandomEventListener.java b/cq/src/main/java/org/apache/geode_examples/cq/RandomEventListener.java
new file mode 100644
index 0000000..ee2e802
--- /dev/null
+++ b/cq/src/main/java/org/apache/geode_examples/cq/RandomEventListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode_examples.cq;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.query.CqEvent;
+import org.apache.geode.cache.query.CqListener;
+
+public class RandomEventListener implements CqListener {
+
+ @Override
+ public void onEvent(CqEvent cqEvent) {
+
+ Operation queryOperation = cqEvent.getQueryOperation();
+
+
+ if (queryOperation.isUpdate()) {
+ System.out.print("-------Updated Value\n");
+ } else if (queryOperation.isCreate()) {
+ System.out.print("-------Value Created\n");
+ }
+ }
+
+ @Override
+ public void onError(CqEvent cqEvent) {
+ System.out.print("**Something bad happened**");
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/functions/src/test/java/org/apache/geode/examples/functions/ExampleTest.java b/functions/src/test/java/org/apache/geode_examples/functions/ExampleTest.java
similarity index 97%
rename from functions/src/test/java/org/apache/geode/examples/functions/ExampleTest.java
rename to functions/src/test/java/org/apache/geode_examples/functions/ExampleTest.java
index da2f4a0..e34e0cf 100644
--- a/functions/src/test/java/org/apache/geode/examples/functions/ExampleTest.java
+++ b/functions/src/test/java/org/apache/geode_examples/functions/ExampleTest.java
@@ -14,7 +14,7 @@
*/
package org.apache.geode_examples.functions;
-import static org.junit.Assert.assertEquals;
+import static org.jgroups.util.Util.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -22,12 +22,12 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import org.junit.Test;
+
import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.ResultCollector;
-import org.junit.Test;
-
public class ExampleTest {
@Test
public void testExample() throws Exception {
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 211d62b..5ecb5df 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Mon Jun 20 23:30:08 PDT 2016
+#Tue Jun 12 15:19:00 PDT 2018
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-3.3-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-3.3-all.zip
diff --git a/queries/src/main/java/org/apache/geode/examples/queries/EmployeeData.java b/queries/src/main/java/org/apache/geode_examples/queries/EmployeeData.java
similarity index 100%
rename from queries/src/main/java/org/apache/geode/examples/queries/EmployeeData.java
rename to queries/src/main/java/org/apache/geode_examples/queries/EmployeeData.java
diff --git a/queries/src/main/java/org/apache/geode/examples/queries/Example.java b/queries/src/main/java/org/apache/geode_examples/queries/Example.java
similarity index 100%
rename from queries/src/main/java/org/apache/geode/examples/queries/Example.java
rename to queries/src/main/java/org/apache/geode_examples/queries/Example.java
diff --git a/settings.gradle b/settings.gradle
index 86ebb89..d5857ee 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -22,6 +22,7 @@ include 'queries'
include 'lucene'
include 'loader'
include 'putall'
+include 'cq'
include 'clientSecurity'
include 'functions'
include 'persistence'