You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/12/18 13:09:19 UTC
ignite git commit: IGNITE-2123: Need to add EntryProcessorExample to
cache examples
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5 24eccb87f -> 32cec9946
IGNITE-2123: Need to add EntryProcessorExample to cache examples
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32cec994
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32cec994
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32cec994
Branch: refs/heads/ignite-1.5
Commit: 32cec99465e667e98010da87140a9eb80bfca743
Parents: 24eccb8
Author: Roman Shtykh <ap...@gmail.com>
Authored: Fri Dec 18 15:09:00 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Dec 18 15:09:00 2015 +0300
----------------------------------------------------------------------
RELEASE_NOTES.txt | 1 +
.../datagrid/CacheEntryProcessorExample.java | 157 +++++++++++++++++++
.../datagrid/CacheEntryProcessorExample.java | 147 +++++++++++++++++
.../ScalarCacheEntryProcessorExample.scala | 125 +++++++++++++++
.../ignite/examples/CacheExamplesSelfTest.java | 8 +
.../java8/examples/CacheExamplesSelfTest.java | 8 +
.../tests/examples/ScalarExamplesSelfTest.scala | 5 +
7 files changed, 451 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/RELEASE_NOTES.txt
----------------------------------------------------------------------
diff --git a/RELEASE_NOTES.txt b/RELEASE_NOTES.txt
index 243ec18..b0822c9 100644
--- a/RELEASE_NOTES.txt
+++ b/RELEASE_NOTES.txt
@@ -18,6 +18,7 @@ Apache Ignite In-Memory Data Fabric 1.5
* Fixed and improved cache types configuration.
* Fixed cache rebalancing.
* Many stability and fault-tolerance fixes.
+* Added example to demonstrate the usage of EntryProcessor.
Complete list of closed issues: https://issues.apache.org/jira/issues/?jql=project%20%3D%20IGNITE%20AND%20fixVersion%20%3D%201.5%20AND%20status%20%3D%20closed
http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java
new file mode 100644
index 0000000..38d9631
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+
+/**
+ * This example demonstrates the simplest code that populates the distributed cache
+ * and co-locates simple closure execution with each key. The goal of this particular
+ * example is to provide the simplest code example of this logic using EntryProcessor.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheEntryProcessorExample {
+ /** Cache name. */
+ private static final String CACHE_NAME = CacheEntryProcessorExample.class.getSimpleName();
+
+ /** Number of keys. */
+ private static final int KEY_CNT = 20;
+
+ /** Keys predefined set. */
+ private static final Set<Integer> KEYS_SET;
+
+ /**
+ * Initializes keys set that is used in bulked operations in the example.
+ */
+ static {
+ KEYS_SET = new HashSet<>();
+
+ for (int i = 0; i < KEY_CNT; i++)
+ KEYS_SET.add(i);
+ }
+
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ * @throws IgniteException If example execution failed.
+ */
+ public static void main(String[] args) throws IgniteException {
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println();
+ System.out.println(">>> Entry processor example started.");
+
+ try (IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(CACHE_NAME)) {
+ // Demonstrates usage of EntryProcessor.invoke(...) method.
+ populateEntriesWithInvoke(cache);
+
+ // Demonstrates usage of EntryProcessor.invokeAll(...) method.
+ incrementEntriesWithInvokeAll(cache);
+ }
+ }
+ }
+
+ /**
+ * Populates cache with values using {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method.
+ *
+ * @param cache Cache that must be populated.
+ */
+ private static void populateEntriesWithInvoke(IgniteCache<Integer, Integer> cache) {
+ // Must be no entry in the cache at this point.
+ printCacheEntries(cache);
+
+ System.out.println("");
+ System.out.println(">> Populating the cache using EntryProcessor.");
+
+ // Invokes EntryProcessor for every key sequentially.
+ for (int i = 0; i < KEY_CNT; i++) {
+ cache.invoke(i, new EntryProcessor<Integer, Integer, Object>() {
+ @Override public Object process(MutableEntry<Integer, Integer> entry,
+ Object... objects) throws EntryProcessorException {
+ // Initializes entry's value if it's not set.
+ if (entry.getValue() == null)
+ entry.setValue((entry.getKey() + 1) * 10);
+
+ return null;
+ }
+ });
+ }
+
+ // Print outs entries that are set using the EntryProcessor above.
+ printCacheEntries(cache);
+ }
+
+ /**
+ * Increments values of entries stored in the cache using
+ * {@link IgniteCache#invokeAll(Set, EntryProcessor, Object...)} method.
+ *
+ * @param cache Cache instance.
+ */
+ private static void incrementEntriesWithInvokeAll(IgniteCache<Integer, Integer> cache) {
+ System.out.println("");
+ System.out.println(">> Incrementing values in the cache using EntryProcessor.");
+
+ // Using EntryProcessor.invokeAll to increment every value in place.
+ cache.invokeAll(KEYS_SET, new EntryProcessor<Integer, Integer, Object>() {
+ @Override public Object process(MutableEntry<Integer, Integer> entry,
+ Object... arguments) throws EntryProcessorException {
+
+ entry.setValue(entry.getValue() + 5);
+
+ return null;
+ }
+ });
+
+ // Print outs entries that are incremented using the EntryProcessor above.
+ printCacheEntries(cache);
+ }
+
+ /**
+ * Prints out all the entries that are stored in a cache.
+ *
+ * @param cache Cache.
+ */
+ private static void printCacheEntries(IgniteCache<Integer, Integer> cache) {
+ System.out.println();
+ System.out.println(">>> Entries in the cache.");
+
+ Map<Integer, Integer> entries = cache.getAll(KEYS_SET);
+
+ if (entries.isEmpty())
+ System.out.println("No entries in the cache.");
+ else {
+ for (Map.Entry<Integer, Integer> entry : entries.entrySet())
+ System.out.println("Entry [key=" + entry.getKey() + ", value=" + entry.getValue() + ']');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java
new file mode 100644
index 0000000..fd07fa5
--- /dev/null
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.datagrid;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+
+/**
+ * This example demonstrates the simplest code that populates the distributed cache and co-locates simple closure
+ * execution with each key. The goal of this particular example is to provide the simplest code example of this logic
+ * using EntryProcessor.
+ * <p>
+ * Remote nodes should always be started with special configuration file which enables P2P
+ * class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with
+ * {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheEntryProcessorExample {
+ /** Cache name. */
+ private static final String CACHE_NAME = CacheEntryProcessorExample.class.getSimpleName();
+
+ /** Number of keys. */
+ private static final int KEY_CNT = 20;
+
+ /** Set of predefined keys. */
+ private static final Set<Integer> KEYS_SET;
+
+ /**
+ * Initializes keys set that is used in bulked operations in the example.
+ */
+ static {
+ KEYS_SET = new HashSet<>();
+
+ for (int i = 0; i < KEY_CNT; i++)
+ KEYS_SET.add(i);
+ }
+
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ * @throws IgniteException If example execution failed.
+ */
+ public static void main(String[] args) throws IgniteException {
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println();
+ System.out.println(">>> Entry processor example started.");
+
+ try (IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(CACHE_NAME)) {
+ // Demonstrates usage of EntryProcessor.invoke(...) method.
+ populateEntriesWithInvoke(cache);
+
+ // Demonstrates usage of EntryProcessor.invokeAll(...) method.
+ incrementEntriesWithInvokeAll(cache);
+ }
+ }
+ }
+
+ /**
+ * Populates cache with values using {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method.
+ *
+ * @param cache Cache that must be populated.
+ */
+ private static void populateEntriesWithInvoke(IgniteCache<Integer, Integer> cache) {
+ // Must be no entry in the cache at this point.
+ printCacheEntries(cache);
+
+ System.out.println("");
+ System.out.println(">> Populating the cache using EntryProcessor.");
+
+ // Invokes EntryProcessor for every key sequentially.
+ for (int i = 0; i < KEY_CNT; i++) {
+ cache.invoke(i, (entry, object) -> {
+ // Initializes entry's value if it's not set.
+ if (entry.getValue() == null)
+ entry.setValue((entry.getKey() + 1) * 10);
+ return null;
+ });
+ }
+
+ // Print outs entries that are set using the EntryProcessor above.
+ printCacheEntries(cache);
+ }
+
+ /**
+ * Increments values of entries stored in the cache using {@link IgniteCache#invokeAll(Set, EntryProcessor,
+ * Object...)} method.
+ *
+ * @param cache Cache instance.
+ */
+ private static void incrementEntriesWithInvokeAll(IgniteCache<Integer, Integer> cache) {
+ System.out.println("");
+ System.out.println(">> Incrementing values in the cache using EntryProcessor.");
+
+ // Using EntryProcessor.invokeAll to increment every value in place.
+ cache.invokeAll(KEYS_SET, (entry, object) -> {
+ entry.setValue(entry.getValue() + 5);
+
+ return null;
+ });
+
+ // Print outs entries that are incremented using the EntryProcessor above.
+ printCacheEntries(cache);
+ }
+
+ /**
+ * Prints out all the entries that are stored in a cache.
+ *
+ * @param cache Cache.
+ */
+ private static void printCacheEntries(IgniteCache<Integer, Integer> cache) {
+ System.out.println();
+ System.out.println(">>> Entries in the cache.");
+
+ Map<Integer, Integer> entries = cache.getAll(KEYS_SET);
+
+ if (entries.isEmpty())
+ System.out.println("No entries in the cache.");
+ else {
+ for (Map.Entry<Integer, Integer> entry : entries.entrySet())
+ System.out.println("Entry [key=" + entry.getKey() + ", value=" + entry.getValue() + ']');
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala
new file mode 100644
index 0000000..ffcbbfd
--- /dev/null
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.scalar.examples
+
+import javax.cache.processor.{EntryProcessor, MutableEntry}
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.scalar.scalar
+import org.apache.ignite.scalar.scalar._
+
+/**
+ * This example demonstrates the simplest code that populates the distributed cache
+ * and co-locates simple closure execution with each key. The goal of this particular
+ * example is to provide the simplest code example of this logic using EntryProcessor.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+object ScalarCacheEntryProcessorExample extends App {
+ /** Configuration file name. */
+ private val CONFIG = "examples/config/example-ignite.xml"
+
+ /** Name of cache. */
+ private val CACHE_NAME = ScalarCacheEntryProcessorExample.getClass.getSimpleName
+
+ /** Number of keys. */
+ private val KEY_CNT = 20
+
+ /** Type alias. */
+ type Cache = IgniteCache[String, Int]
+
+ /*
+ * Note that in case of `LOCAL` configuration,
+ * since there is no distribution, values may come back as `nulls`.
+ */
+ scalar(CONFIG) {
+ println()
+ println(">>> Entry processor example started.")
+
+ val cache = createCache$[String, Int](CACHE_NAME)
+
+ try {
+ populateEntriesWithInvoke(cache)
+
+ checkEntriesInCache(cache)
+
+ incrementEntriesWithInvoke(cache)
+
+ checkEntriesInCache(cache)
+ }
+ finally {
+ cache.destroy()
+ }
+ }
+
+ private def checkEntriesInCache(cache: Cache) {
+ println()
+ println(">>> Entries in the cache.")
+
+ (0 until KEY_CNT).foreach(i =>
+ println("Entry: " + cache.get(i.toString)))
+ }
+
+ /**
+ * Runs jobs on primary nodes with {@link IgniteCache#invoke(Object, CacheEntryProcessor, Object...)} to create
+ * entries when they don't exist.
+ *
+ * @param cache Cache to populate.
+ */
+ private def populateEntriesWithInvoke(cache: Cache) {
+ (0 until KEY_CNT).foreach(i =>
+ cache.invoke(i.toString,
+ new EntryProcessor[String, Int, Object]() {
+ override def process(e: MutableEntry[String, Int], args: AnyRef*): Object = {
+ if (e.getValue == null)
+ e.setValue(i)
+
+ null
+ }
+ }
+ )
+ )
+ }
+
+ /**
+ * Runs jobs on primary nodes with {@link IgniteCache#invoke(Object, CacheEntryProcessor, Object...)} to increment
+ * entries values.
+ *
+ * @param cache Cache to populate.
+ */
+ private def incrementEntriesWithInvoke(cache: Cache) {
+ println()
+ println(">>> Incrementing values.")
+
+ (0 until KEY_CNT).foreach(i =>
+ cache.invoke(i.toString,
+ new EntryProcessor[String, Int, Object]() {
+ override def process(e: MutableEntry[String, Int], args: AnyRef*): Object = {
+ Option(e.getValue) foreach (v => e.setValue(v + 1))
+
+ null
+ }
+ }
+ )
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index 050c59f..39c2ea6 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.examples;
import org.apache.ignite.examples.datagrid.CacheAffinityExample;
+import org.apache.ignite.examples.datagrid.CacheEntryProcessorExample;
import org.apache.ignite.examples.datagrid.CacheApiExample;
import org.apache.ignite.examples.datagrid.CacheContinuousQueryExample;
import org.apache.ignite.examples.datagrid.CacheDataStreamerExample;
@@ -49,6 +50,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
/**
* @throws Exception If failed.
*/
+ public void testCacheEntryProcessorExample() throws Exception {
+ CacheEntryProcessorExample.main(EMPTY_ARGS);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testCacheAtomicLongExample() throws Exception {
IgniteAtomicLongExample.main(EMPTY_ARGS);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
index 4446521..e44fec3 100644
--- a/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.java8.examples;
import org.apache.ignite.examples.java8.datagrid.CacheAffinityExample;
+import org.apache.ignite.examples.java8.datagrid.CacheEntryProcessorExample;
import org.apache.ignite.examples.java8.datagrid.CacheApiExample;
import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest;
@@ -36,6 +37,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
CacheAffinityExample.main(EMPTY_ARGS);
}
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheEntryProcessorExample() throws Exception {
+ CacheEntryProcessorExample.main(EMPTY_ARGS);
+ }
+
// TODO: IGNITE-711 next example(s) should be implemented for java 8
// or testing method(s) should be removed if example(s) does not applicable for java 8.
// /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
index ef56434..94c41ad 100644
--- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
+++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
@@ -35,6 +35,11 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik
}
/** */
+ def testScalarCacheEntryProcessorExample() {
+ ScalarCacheEntryProcessorExample.main(EMPTY_ARGS)
+ }
+
+ /** */
def testScalarCacheExample() {
ScalarCacheExample.main(EMPTY_ARGS)
}