You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/12/18 13:42:23 UTC

[10/13] ignite git commit: IGNITE-2123: Need to add EntryProcessorExample to cache examples

IGNITE-2123: Need to add EntryProcessorExample to cache examples


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/32cec994
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/32cec994
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/32cec994

Branch: refs/heads/ignite-1.5.1-2
Commit: 32cec99465e667e98010da87140a9eb80bfca743
Parents: 24eccb8
Author: Roman Shtykh <ap...@gmail.com>
Authored: Fri Dec 18 15:09:00 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Dec 18 15:09:00 2015 +0300

----------------------------------------------------------------------
 RELEASE_NOTES.txt                               |   1 +
 .../datagrid/CacheEntryProcessorExample.java    | 157 +++++++++++++++++++
 .../datagrid/CacheEntryProcessorExample.java    | 147 +++++++++++++++++
 .../ScalarCacheEntryProcessorExample.scala      | 125 +++++++++++++++
 .../ignite/examples/CacheExamplesSelfTest.java  |   8 +
 .../java8/examples/CacheExamplesSelfTest.java   |   8 +
 .../tests/examples/ScalarExamplesSelfTest.scala |   5 +
 7 files changed, 451 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/RELEASE_NOTES.txt
----------------------------------------------------------------------
diff --git a/RELEASE_NOTES.txt b/RELEASE_NOTES.txt
index 243ec18..b0822c9 100644
--- a/RELEASE_NOTES.txt
+++ b/RELEASE_NOTES.txt
@@ -18,6 +18,7 @@ Apache Ignite In-Memory Data Fabric 1.5
 * Fixed and improved cache types configuration.
 * Fixed cache rebalancing.
 * Many stability and fault-tolerance fixes.
+* Added example to demonstrate the usage of EntryProcessor.
 
 Complete list of closed issues: https://issues.apache.org/jira/issues/?jql=project%20%3D%20IGNITE%20AND%20fixVersion%20%3D%201.5%20AND%20status%20%3D%20closed
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java
new file mode 100644
index 0000000..38d9631
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEntryProcessorExample.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+
+/**
+ * This example demonstrates the simplest code that populates the distributed cache
+ * and co-locates simple closure execution with each key. The goal of this particular
+ * example is to provide the simplest code example of this logic using EntryProcessor.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheEntryProcessorExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = CacheEntryProcessorExample.class.getSimpleName();
+
+    /** Number of keys. */
+    private static final int KEY_CNT = 20;
+
+    /** Keys predefined set. */
+    private static final Set<Integer> KEYS_SET;
+
+    /**
+     * Initializes keys set that is used in bulked operations in the example.
+     */
+    static {
+        KEYS_SET = new HashSet<>();
+
+        for (int i = 0; i < KEY_CNT; i++)
+            KEYS_SET.add(i);
+    }
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Entry processor example started.");
+
+            try (IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(CACHE_NAME)) {
+                // Demonstrates usage of EntryProcessor.invoke(...) method.
+                populateEntriesWithInvoke(cache);
+
+                // Demonstrates usage of EntryProcessor.invokeAll(...) method.
+                incrementEntriesWithInvokeAll(cache);
+            }
+        }
+    }
+
+    /**
+     * Populates cache with values using {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method.
+     *
+     * @param cache Cache that must be populated.
+     */
+    private static void populateEntriesWithInvoke(IgniteCache<Integer, Integer> cache) {
+        // Must be no entry in the cache at this point.
+        printCacheEntries(cache);
+
+        System.out.println("");
+        System.out.println(">> Populating the cache using EntryProcessor.");
+
+        // Invokes EntryProcessor for every key sequentially.
+        for (int i = 0; i < KEY_CNT; i++) {
+            cache.invoke(i, new EntryProcessor<Integer, Integer, Object>() {
+                @Override public Object process(MutableEntry<Integer, Integer> entry,
+                    Object... objects) throws EntryProcessorException {
+                    // Initializes entry's value if it's not set.
+                    if (entry.getValue() == null)
+                        entry.setValue((entry.getKey() + 1) * 10);
+
+                    return null;
+                }
+            });
+        }
+
+        // Print outs entries that are set using the EntryProcessor above.
+        printCacheEntries(cache);
+    }
+
+    /**
+     * Increments values of entries stored in the cache using
+     * {@link IgniteCache#invokeAll(Set, EntryProcessor, Object...)} method.
+     *
+     * @param cache Cache instance.
+     */
+    private static void incrementEntriesWithInvokeAll(IgniteCache<Integer, Integer> cache) {
+        System.out.println("");
+        System.out.println(">> Incrementing values in the cache using EntryProcessor.");
+
+        // Using EntryProcessor.invokeAll to increment every value in place.
+        cache.invokeAll(KEYS_SET, new EntryProcessor<Integer, Integer, Object>() {
+            @Override public Object process(MutableEntry<Integer, Integer> entry,
+                Object... arguments) throws EntryProcessorException {
+
+                entry.setValue(entry.getValue() + 5);
+
+                return null;
+            }
+        });
+
+        // Print outs entries that are incremented using the EntryProcessor above.
+        printCacheEntries(cache);
+    }
+
+    /**
+     * Prints out all the entries that are stored in a cache.
+     *
+     * @param cache Cache.
+     */
+    private static void printCacheEntries(IgniteCache<Integer, Integer> cache) {
+        System.out.println();
+        System.out.println(">>> Entries in the cache.");
+
+        Map<Integer, Integer> entries = cache.getAll(KEYS_SET);
+
+        if (entries.isEmpty())
+            System.out.println("No entries in the cache.");
+        else {
+            for (Map.Entry<Integer, Integer> entry : entries.entrySet())
+                System.out.println("Entry [key=" + entry.getKey() + ", value=" + entry.getValue() + ']');
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java
new file mode 100644
index 0000000..fd07fa5
--- /dev/null
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/datagrid/CacheEntryProcessorExample.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.datagrid;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.examples.ExampleNodeStartup;
+
+/**
+ * This example demonstrates the simplest code that populates the distributed cache and co-locates simple closure
+ * execution with each key. The goal of this particular example is to provide the simplest code example of this logic
+ * using EntryProcessor.
+ * <p>
+ * Remote nodes should always be started with special configuration file which enables P2P
+ * class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will start node with
+ * {@code examples/config/example-ignite.xml} configuration.
+ */
+public class CacheEntryProcessorExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = CacheEntryProcessorExample.class.getSimpleName();
+
+    /** Number of keys. */
+    private static final int KEY_CNT = 20;
+
+    /** Set of predefined keys. */
+    private static final Set<Integer> KEYS_SET;
+
+    /**
+     * Initializes keys set that is used in bulked operations in the example.
+     */
+    static {
+        KEYS_SET = new HashSet<>();
+
+        for (int i = 0; i < KEY_CNT; i++)
+            KEYS_SET.add(i);
+    }
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            System.out.println();
+            System.out.println(">>> Entry processor example started.");
+
+            try (IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(CACHE_NAME)) {
+                // Demonstrates usage of EntryProcessor.invoke(...) method.
+                populateEntriesWithInvoke(cache);
+
+                // Demonstrates usage of EntryProcessor.invokeAll(...) method.
+                incrementEntriesWithInvokeAll(cache);
+            }
+        }
+    }
+
+    /**
+     * Populates cache with values using {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method.
+     *
+     * @param cache Cache that must be populated.
+     */
+    private static void populateEntriesWithInvoke(IgniteCache<Integer, Integer> cache) {
+        // Must be no entry in the cache at this point.
+        printCacheEntries(cache);
+
+        System.out.println("");
+        System.out.println(">> Populating the cache using EntryProcessor.");
+
+        // Invokes EntryProcessor for every key sequentially.
+        for (int i = 0; i < KEY_CNT; i++) {
+            cache.invoke(i, (entry, object) -> {
+                // Initializes entry's value if it's not set.
+                if (entry.getValue() == null)
+                    entry.setValue((entry.getKey() + 1) * 10);
+                return null;
+            });
+        }
+
+        // Print outs entries that are set using the EntryProcessor above.
+        printCacheEntries(cache);
+    }
+
+    /**
+     * Increments values of entries stored in the cache using {@link IgniteCache#invokeAll(Set, EntryProcessor,
+     * Object...)} method.
+     *
+     * @param cache Cache instance.
+     */
+    private static void incrementEntriesWithInvokeAll(IgniteCache<Integer, Integer> cache) {
+        System.out.println("");
+        System.out.println(">> Incrementing values in the cache using EntryProcessor.");
+
+        // Using EntryProcessor.invokeAll to increment every value in place.
+        cache.invokeAll(KEYS_SET, (entry, object) -> {
+            entry.setValue(entry.getValue() + 5);
+
+            return null;
+        });
+
+        // Print outs entries that are incremented using the EntryProcessor above.
+        printCacheEntries(cache);
+    }
+
+    /**
+     * Prints out all the entries that are stored in a cache.
+     *
+     * @param cache Cache.
+     */
+    private static void printCacheEntries(IgniteCache<Integer, Integer> cache) {
+        System.out.println();
+        System.out.println(">>> Entries in the cache.");
+
+        Map<Integer, Integer> entries = cache.getAll(KEYS_SET);
+
+        if (entries.isEmpty())
+            System.out.println("No entries in the cache.");
+        else {
+            for (Map.Entry<Integer, Integer> entry : entries.entrySet())
+                System.out.println("Entry [key=" + entry.getKey() + ", value=" + entry.getValue() + ']');
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala
new file mode 100644
index 0000000..ffcbbfd
--- /dev/null
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarCacheEntryProcessorExample.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.scalar.examples
+
+import javax.cache.processor.{EntryProcessor, MutableEntry}
+
+import org.apache.ignite.IgniteCache
+import org.apache.ignite.scalar.scalar
+import org.apache.ignite.scalar.scalar._
+
+/**
+ * This example demonstrates the simplest code that populates the distributed cache
+ * and co-locates simple closure execution with each key. The goal of this particular
+ * example is to provide the simplest code example of this logic using EntryProcessor.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+object ScalarCacheEntryProcessorExample extends App {
+    /** Configuration file name. */
+    private val CONFIG = "examples/config/example-ignite.xml"
+
+    /** Name of cache. */
+    private val CACHE_NAME = ScalarCacheEntryProcessorExample.getClass.getSimpleName
+
+    /** Number of keys. */
+    private val KEY_CNT = 20
+
+    /** Type alias. */
+    type Cache = IgniteCache[String, Int]
+
+    /*
+     * Note that in case of `LOCAL` configuration,
+     * since there is no distribution, values may come back as `nulls`.
+     */
+    scalar(CONFIG) {
+        println()
+        println(">>> Entry processor example started.")
+
+        val cache = createCache$[String, Int](CACHE_NAME)
+
+        try {
+            populateEntriesWithInvoke(cache)
+
+            checkEntriesInCache(cache)
+
+            incrementEntriesWithInvoke(cache)
+
+            checkEntriesInCache(cache)
+        }
+        finally {
+            cache.destroy()
+        }
+    }
+
+    private def checkEntriesInCache(cache: Cache) {
+        println()
+        println(">>> Entries in the cache.")
+
+        (0 until KEY_CNT).foreach(i =>
+            println("Entry: " + cache.get(i.toString)))
+    }
+
+    /**
+     * Runs jobs on primary nodes with {@link IgniteCache#invoke(Object, CacheEntryProcessor, Object...)} to create
+     * entries when they don't exist.
+     *
+     * @param cache Cache to populate.
+     */
+    private def populateEntriesWithInvoke(cache: Cache) {
+        (0 until KEY_CNT).foreach(i =>
+            cache.invoke(i.toString,
+                new EntryProcessor[String, Int, Object]() {
+                    override def process(e: MutableEntry[String, Int], args: AnyRef*): Object = {
+                        if (e.getValue == null)
+                            e.setValue(i)
+
+                        null
+                    }
+                }
+            )
+        )
+    }
+
+    /**
+     * Runs jobs on primary nodes with {@link IgniteCache#invoke(Object, CacheEntryProcessor, Object...)} to increment
+     * entries values.
+     *
+     * @param cache Cache to populate.
+     */
+    private def incrementEntriesWithInvoke(cache: Cache) {
+        println()
+        println(">>> Incrementing values.")
+
+        (0 until KEY_CNT).foreach(i =>
+            cache.invoke(i.toString,
+                new EntryProcessor[String, Int, Object]() {
+                    override def process(e: MutableEntry[String, Int], args: AnyRef*): Object = {
+                        Option(e.getValue) foreach (v => e.setValue(v + 1))
+
+                        null
+                    }
+                }
+            )
+        )
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index 050c59f..39c2ea6 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.examples;
 
 import org.apache.ignite.examples.datagrid.CacheAffinityExample;
+import org.apache.ignite.examples.datagrid.CacheEntryProcessorExample;
 import org.apache.ignite.examples.datagrid.CacheApiExample;
 import org.apache.ignite.examples.datagrid.CacheContinuousQueryExample;
 import org.apache.ignite.examples.datagrid.CacheDataStreamerExample;
@@ -49,6 +50,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCacheEntryProcessorExample() throws Exception {
+        CacheEntryProcessorExample.main(EMPTY_ARGS);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCacheAtomicLongExample() throws Exception {
         IgniteAtomicLongExample.main(EMPTY_ARGS);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java b/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
index 4446521..e44fec3 100644
--- a/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java8/org/apache/ignite/java8/examples/CacheExamplesSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.java8.examples;
 
 import org.apache.ignite.examples.java8.datagrid.CacheAffinityExample;
+import org.apache.ignite.examples.java8.datagrid.CacheEntryProcessorExample;
 import org.apache.ignite.examples.java8.datagrid.CacheApiExample;
 import org.apache.ignite.testframework.junits.common.GridAbstractExamplesTest;
 
@@ -36,6 +37,13 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
         CacheAffinityExample.main(EMPTY_ARGS);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheEntryProcessorExample() throws Exception {
+        CacheEntryProcessorExample.main(EMPTY_ARGS);
+    }
+
 //    TODO: IGNITE-711 next example(s) should be implemented for java 8
 //    or testing method(s) should be removed if example(s) does not applicable for java 8.
 //    /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/32cec994/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
index ef56434..94c41ad 100644
--- a/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
+++ b/examples/src/test/scala/org/apache/ignite/scalar/tests/examples/ScalarExamplesSelfTest.scala
@@ -35,6 +35,11 @@ class ScalarExamplesSelfTest extends GridAbstractExamplesTest with JUnitSuiteLik
     }
 
     /** */
+    def testScalarCacheEntryProcessorExample() {
+        ScalarCacheEntryProcessorExample.main(EMPTY_ARGS)
+    }
+
+    /** */
     def testScalarCacheExample() {
         ScalarCacheExample.main(EMPTY_ARGS)
     }