You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/03/10 03:53:37 UTC

ignite git commit: IGNITE-2756 - Fixed StreamVisitorExample

Repository: ignite
Updated Branches:
  refs/heads/master c31f384d5 -> a579d4541


IGNITE-2756 - Fixed StreamVisitorExample


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a579d454
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a579d454
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a579d454

Branch: refs/heads/master
Commit: a579d45415b3063ac72069924a15e87beda0447c
Parents: c31f384
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 9 18:53:30 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 9 18:53:30 2016 -0800

----------------------------------------------------------------------
 .../streaming/StreamVisitorExample.java         | 31 +++++++++++++-------
 1 file changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a579d454/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
index 29781bc..c3d8c64 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
@@ -35,12 +35,16 @@ import org.apache.ignite.examples.ExamplesUtils;
 import org.apache.ignite.stream.StreamVisitor;
 
 /**
- * Stream random numbers into the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
- *     <li>Start streaming using {@link StreamVisitorExample}.</li>
- * </ul>
+ * This examples demonstrates the stream visitor which allows to customize the processing
+ * of the streamed data on the server side. Instead of populating the cache for which the
+ * streamer is created, we will calculate aggregated data on the fly and save results in
+ * another cache.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
  */
 public class StreamVisitorExample {
     /** Random number generator. */
@@ -53,7 +57,7 @@ public class StreamVisitorExample {
     private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
 
     /** Caches' names. */
-    private static final String CACHE_NAME = "instCache";
+    private static final String INSTRUMENTS_CACHE_NAME = "instCache";
     private static final String MARKET_TICKS_CACHE_NAME = "marketTicks";
 
     public static void main(String[] args) throws Exception {
@@ -65,7 +69,7 @@ public class StreamVisitorExample {
                 return;
 
             // Financial instrument cache configuration.
-            CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>(CACHE_NAME);
+            CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>(INSTRUMENTS_CACHE_NAME);
 
             // Index key and value for querying financial instruments.
             // Note that Instrument class has @QuerySqlField annotation for secondary field indexing.
@@ -77,6 +81,10 @@ public class StreamVisitorExample {
                 IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg)
             ) {
                 try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) {
+                    // To achieve proper indexing we should use fully-qualified name
+                    // of the class as a type name when binary object is created.
+                    final String instTypeName = Instrument.class.getName();
+
                     // Note that we receive market data, but do not populate 'mktCache' (it remains empty).
                     // Instead we update the instruments in the 'instCache'.
                     // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated.
@@ -85,14 +93,15 @@ public class StreamVisitorExample {
                             String symbol = e.getKey();
                             Double tick = e.getValue();
 
-                            IgniteCache<String, BinaryObject> binInstCache = ignite.cache("instCache").withKeepBinary();
+                            IgniteCache<String, BinaryObject> binInstCache =
+                                ignite.cache(INSTRUMENTS_CACHE_NAME).withKeepBinary();
 
                             BinaryObject inst = binInstCache.get(symbol);
 
                             BinaryObjectBuilder instBuilder;
 
                             if (inst == null) {
-                                instBuilder = ignite.binary().builder("Instrument");
+                                instBuilder = ignite.binary().builder(instTypeName);
 
                                 // Constructor logic.
                                 instBuilder.setField(
@@ -146,7 +155,7 @@ public class StreamVisitorExample {
             }
             finally {
                 // Distributed cache could be removed from cluster only by #destroyCache() call.
-                ignite.destroyCache(CACHE_NAME);
+                ignite.destroyCache(INSTRUMENTS_CACHE_NAME);
                 ignite.destroyCache(MARKET_TICKS_CACHE_NAME);
             }
         }