You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/03/15 19:18:02 UTC
[06/35] ignite git commit: IGNITE-2756 - Fixed StreamVisitorExample
IGNITE-2756 - Fixed StreamVisitorExample
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a579d454
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a579d454
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a579d454
Branch: refs/heads/ignite-2791
Commit: a579d45415b3063ac72069924a15e87beda0447c
Parents: c31f384
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 9 18:53:30 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 9 18:53:30 2016 -0800
----------------------------------------------------------------------
.../streaming/StreamVisitorExample.java | 31 +++++++++++++-------
1 file changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a579d454/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
index 29781bc..c3d8c64 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
@@ -35,12 +35,16 @@ import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.stream.StreamVisitor;
/**
- * Stream random numbers into the streaming cache.
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
- * <li>Start streaming using {@link StreamVisitorExample}.</li>
- * </ul>
+ * This examples demonstrates the stream visitor which allows to customize the processing
+ * of the streamed data on the server side. Instead of populating the cache for which the
+ * streamer is created, we will calculate aggregated data on the fly and save results in
+ * another cache.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
*/
public class StreamVisitorExample {
/** Random number generator. */
@@ -53,7 +57,7 @@ public class StreamVisitorExample {
private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50};
/** Caches' names. */
- private static final String CACHE_NAME = "instCache";
+ private static final String INSTRUMENTS_CACHE_NAME = "instCache";
private static final String MARKET_TICKS_CACHE_NAME = "marketTicks";
public static void main(String[] args) throws Exception {
@@ -65,7 +69,7 @@ public class StreamVisitorExample {
return;
// Financial instrument cache configuration.
- CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>(CACHE_NAME);
+ CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>(INSTRUMENTS_CACHE_NAME);
// Index key and value for querying financial instruments.
// Note that Instrument class has @QuerySqlField annotation for secondary field indexing.
@@ -77,6 +81,10 @@ public class StreamVisitorExample {
IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg)
) {
try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName())) {
+ // To achieve proper indexing we should use fully-qualified name
+ // of the class as a type name when binary object is created.
+ final String instTypeName = Instrument.class.getName();
+
// Note that we receive market data, but do not populate 'mktCache' (it remains empty).
// Instead we update the instruments in the 'instCache'.
// Since both, 'instCache' and 'mktCache' use the same key, updates are collocated.
@@ -85,14 +93,15 @@ public class StreamVisitorExample {
String symbol = e.getKey();
Double tick = e.getValue();
- IgniteCache<String, BinaryObject> binInstCache = ignite.cache("instCache").withKeepBinary();
+ IgniteCache<String, BinaryObject> binInstCache =
+ ignite.cache(INSTRUMENTS_CACHE_NAME).withKeepBinary();
BinaryObject inst = binInstCache.get(symbol);
BinaryObjectBuilder instBuilder;
if (inst == null) {
- instBuilder = ignite.binary().builder("Instrument");
+ instBuilder = ignite.binary().builder(instTypeName);
// Constructor logic.
instBuilder.setField(
@@ -146,7 +155,7 @@ public class StreamVisitorExample {
}
finally {
// Distributed cache could be removed from cluster only by #destroyCache() call.
- ignite.destroyCache(CACHE_NAME);
+ ignite.destroyCache(INSTRUMENTS_CACHE_NAME);
ignite.destroyCache(MARKET_TICKS_CACHE_NAME);
}
}