You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:29 UTC
[28/32] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/aa795a45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/aa795a45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/aa795a45
Branch: refs/heads/master
Commit: aa795a45f88530ab937e87d1ce4d17022133c1f4
Parents: 4794dd4
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:54:27 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:54:27 2014 +0300
----------------------------------------------------------------------
examples/config/example-streamer.xml | 6 +-
.../streaming/StreamingPriceBarsExample.java | 2 +-
.../router/StreamerAffinityEventRouter.java | 142 +++++++++++++++++++
.../StreamerCacheAffinityEventRouter.java | 62 ++++++++
.../router/StreamerLocalEventRouter.java | 38 +++++
.../router/StreamerRandomEventRouter.java | 81 +++++++++++
.../router/StreamerRoundRobinEventRouter.java | 45 ++++++
.../apache/ignite/streamer/router/package.html | 14 ++
.../processors/streamer/IgniteStreamerImpl.java | 2 +-
.../router/StreamerAffinityEventRouter.java | 142 -------------------
.../StreamerCacheAffinityEventRouter.java | 62 --------
.../router/StreamerLocalEventRouter.java | 38 -----
.../router/StreamerRandomEventRouter.java | 81 -----------
.../router/StreamerRoundRobinEventRouter.java | 45 ------
.../gridgain/grid/streamer/router/package.html | 14 --
.../average/spring-streamer-average-local.xml | 2 +-
.../average/spring-streamer-average-random.xml | 2 +-
.../streamer/GridStreamerSelfTest.java | 2 +-
18 files changed, 390 insertions(+), 390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml
index 5aa0fec..9879786 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -83,7 +83,7 @@
Use random event router to execute streamer stages on random nodes
just for demonstration purposes.
-->
- <bean class="org.gridgain.grid.streamer.router.StreamerRandomEventRouter"/>
+ <bean class="org.apache.ignite.streamer.router.StreamerRandomEventRouter"/>
<!--
<bean class="org.gridgain.grid.streamer.router.GridStreamerLocalEventRouter"/>
-->
@@ -141,7 +141,7 @@
numbers are pushed to wrong nodes, the counters will become
inaccurate.
-->
- <bean class="org.gridgain.grid.streamer.router.StreamerAffinityEventRouter"/>
+ <bean class="org.apache.ignite.streamer.router.StreamerAffinityEventRouter"/>
</property>
<!--
@@ -198,7 +198,7 @@
For this example we use affinity event router to make sure that
quotes for identical instruments are routed to the same node.
-->
- <bean class="org.gridgain.grid.streamer.router.StreamerAffinityEventRouter"/>
+ <bean class="org.apache.ignite.streamer.router.StreamerAffinityEventRouter"/>
</property>
</bean>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
index 6e07c4b..1e22a46 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
@@ -12,9 +12,9 @@ package org.gridgain.examples.streaming;
import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.router.*;
import org.gridgain.examples.*;
import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.router.*;
import org.jetbrains.annotations.*;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
new file mode 100644
index 0000000..7f08b55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
@@ -0,0 +1,142 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Router used to colocate identical streamer events or events with identical affinity
+ * key on the same node. Such collocation is often required to perform computations on
+ * multiple events together, for example, find number of occurrences of a word in some
+ * text. In this case you would collocate identical words together to make sure that
+ * you can update their counts.
+ * <h1 class="header">Affinity Key</h1>
+ * Affinity key for collocation of event together on the same node is specified
+ * via {@link AffinityEvent#affinityKey()} method. If event does not implement
+ * {@link AffinityEvent} interface, then event itself will be used to determine affinity.
+ */
+public class StreamerAffinityEventRouter extends StreamerEventRouterAdapter {
+ /** */
+ public static final int REPLICA_CNT = 128;
+
+ /**
+ * All events that implement this interface will be routed based on key affinity.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ public interface AffinityEvent {
+ /**
+ * @return Affinity route key for the event.
+ */
+ public Object affinityKey();
+ }
+
+ /** Grid instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ private final GridConsistentHash<UUID> nodeHash = new GridConsistentHash<>();
+
+ /** */
+ private Collection<UUID> addedNodes = new GridConcurrentHashSet<>();
+
+ /** {@inheritDoc} */
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
+ return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() :
+ evt, ctx);
+ }
+
+ /**
+ * @param obj Object.
+ * @param ctx Context.
+ * @return Rich node.
+ */
+ private ClusterNode node(Object obj, StreamerContext ctx) {
+ while (true) {
+ Collection<ClusterNode> nodes = ctx.projection().nodes();
+
+ assert nodes != null;
+ assert !nodes.isEmpty();
+
+ int nodesSize = nodes.size();
+
+ if (nodesSize == 1) { // Minor optimization.
+ ClusterNode ret = F.first(nodes);
+
+ assert ret != null;
+
+ return ret;
+ }
+
+ final Collection<UUID> lookup = U.newHashSet(nodesSize);
+
+ // Store nodes in map for fast lookup.
+ for (ClusterNode n : nodes)
+ // Add nodes into hash circle, if absent.
+ lookup.add(resolveNode(n));
+
+ // Cleanup circle.
+ if (lookup.size() != addedNodes.size()) {
+ Collection<UUID> rmv = null;
+
+ for (Iterator<UUID> iter = addedNodes.iterator(); iter.hasNext(); ) {
+ UUID id = iter.next();
+
+ if (!lookup.contains(id)) {
+ iter.remove();
+
+ if (rmv == null)
+ rmv = new ArrayList<>();
+
+ rmv.add(id);
+ }
+ }
+
+ if (!F.isEmpty(rmv))
+ nodeHash.removeNodes(rmv);
+ }
+
+ UUID nodeId = nodeHash.node(obj, lookup);
+
+ assert nodeId != null;
+
+ ClusterNode node = ctx.projection().node(nodeId);
+
+ if (node != null)
+ return node;
+ }
+ }
+
+ /**
+ * Add node to hash circle if this is the first node invocation.
+ *
+ * @param n Node to get info for.
+ * @return Node ID.
+ */
+ private UUID resolveNode(ClusterNode n) {
+ UUID nodeId = n.id();
+
+ if (!addedNodes.contains(nodeId)) {
+ addedNodes.add(nodeId);
+
+ nodeHash.addNode(nodeId, REPLICA_CNT);
+ }
+
+ return nodeId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
new file mode 100644
index 0000000..b618fae
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
@@ -0,0 +1,62 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Router used to colocate streamer events with data stored in a partitioned cache.
+ * <h1 class="header">Affinity Key</h1>
+ * Affinity key for collocation of event together on the same node is specified
+ * via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement
+ * {@link CacheAffinityEvent} interface, then event will be routed always to local node.
+ */
+public class StreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter {
+ /**
+ * All events that implement this interface will be routed based on key affinity.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ public interface CacheAffinityEvent {
+ /**
+ * @return Affinity route key for the event.
+ */
+ public Object affinityKey();
+
+ /**
+ * @return Cache name, if {@code null}, the default cache is used.
+ */
+ @Nullable public String cacheName();
+ }
+
+ /** Grid instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
+ if (evt instanceof CacheAffinityEvent) {
+ CacheAffinityEvent e = (CacheAffinityEvent)evt;
+
+ GridCache<Object, Object> c = ((GridEx) ignite).cachex(e.cacheName());
+
+ assert c != null;
+
+ return c.affinity().mapKeyToNode(e.affinityKey());
+ }
+
+ return ignite.cluster().localNode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
new file mode 100644
index 0000000..6109fb5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
@@ -0,0 +1,38 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.util.typedef.*;
+
+import java.util.*;
+
+/**
+ * Local router. Always routes event to local node.
+ */
+public class StreamerLocalEventRouter implements StreamerEventRouter {
+ /** Grid instance. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
+ return ignite.cluster().localNode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
+ Collection<T> evts) {
+ return F.asMap(ignite.cluster().localNode(), evts);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
new file mode 100644
index 0000000..c1f28b9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
@@ -0,0 +1,81 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.util.typedef.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Random router. Routes event to random node.
+ */
+public class StreamerRandomEventRouter extends StreamerEventRouterAdapter {
+ /** Optional predicates to exclude nodes from routing. */
+ private IgnitePredicate<ClusterNode>[] predicates;
+
+ /**
+ * Empty constructor for spring.
+ */
+ public StreamerRandomEventRouter() {
+ this((IgnitePredicate<ClusterNode>[])null);
+ }
+
+ /**
+ * Constructs random event router with optional set of filters to apply to streamer projection.
+ *
+ * @param predicates Node predicates.
+ */
+ public StreamerRandomEventRouter(@Nullable IgnitePredicate<ClusterNode>... predicates) {
+ this.predicates = predicates;
+ }
+
+ /**
+ * Constructs random event router with optional set of filters to apply to streamer projection.
+ *
+ * @param predicates Node predicates.
+ */
+ @SuppressWarnings("unchecked")
+ public StreamerRandomEventRouter(Collection<IgnitePredicate<ClusterNode>> predicates) {
+ if (!F.isEmpty(predicates)) {
+ this.predicates = new IgnitePredicate[predicates.size()];
+
+ predicates.toArray(this.predicates);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
+ Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates);
+
+ if (F.isEmpty(nodes))
+ return null;
+
+ int idx = ThreadLocalRandom8.current().nextInt(nodes.size());
+
+ int i = 0;
+
+ Iterator<ClusterNode> iter = nodes.iterator();
+
+ while (true) {
+ if (!iter.hasNext())
+ iter = nodes.iterator();
+
+ ClusterNode node = iter.next();
+
+ if (idx == i++)
+ return node;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
new file mode 100644
index 0000000..bf8d981
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
@@ -0,0 +1,45 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.streamer.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Round robin router.
+ */
+public class StreamerRoundRobinEventRouter extends StreamerEventRouterAdapter {
+ /** */
+ private final AtomicLong lastOrder = new AtomicLong();
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
+ Collection<ClusterNode> nodes = ctx.projection().nodes();
+
+ int idx = (int)(lastOrder.getAndIncrement() % nodes.size());
+
+ int i = 0;
+
+ Iterator<ClusterNode> iter = nodes.iterator();
+
+ while (true) {
+ if (!iter.hasNext())
+ iter = nodes.iterator();
+
+ ClusterNode node = iter.next();
+
+ if (idx == i++)
+ return node;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html b/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html
new file mode 100644
index 0000000..9416d24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html
@@ -0,0 +1,14 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+ @html.file.header
+ _________ _____ __________________ _____
+ __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+-->
+<html>
+<body>
+ Contains streamer event router implementations.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
index 3586b07..0861c10 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
@@ -14,13 +14,13 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.router.*;
import org.apache.ignite.thread.*;
import org.gridgain.grid.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.managers.communication.*;
import org.gridgain.grid.kernal.managers.deployment.*;
import org.gridgain.grid.kernal.managers.eventstorage.*;
-import org.gridgain.grid.streamer.router.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.future.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java
deleted file mode 100644
index 35b7d08..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Router used to colocate identical streamer events or events with identical affinity
- * key on the same node. Such collocation is often required to perform computations on
- * multiple events together, for example, find number of occurrences of a word in some
- * text. In this case you would collocate identical words together to make sure that
- * you can update their counts.
- * <h1 class="header">Affinity Key</h1>
- * Affinity key for collocation of event together on the same node is specified
- * via {@link AffinityEvent#affinityKey()} method. If event does not implement
- * {@link AffinityEvent} interface, then event itself will be used to determine affinity.
- */
-public class StreamerAffinityEventRouter extends StreamerEventRouterAdapter {
- /** */
- public static final int REPLICA_CNT = 128;
-
- /**
- * All events that implement this interface will be routed based on key affinity.
- */
- @SuppressWarnings("PublicInnerClass")
- public interface AffinityEvent {
- /**
- * @return Affinity route key for the event.
- */
- public Object affinityKey();
- }
-
- /** Grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** */
- private final GridConsistentHash<UUID> nodeHash = new GridConsistentHash<>();
-
- /** */
- private Collection<UUID> addedNodes = new GridConcurrentHashSet<>();
-
- /** {@inheritDoc} */
- @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
- return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() :
- evt, ctx);
- }
-
- /**
- * @param obj Object.
- * @param ctx Context.
- * @return Rich node.
- */
- private ClusterNode node(Object obj, StreamerContext ctx) {
- while (true) {
- Collection<ClusterNode> nodes = ctx.projection().nodes();
-
- assert nodes != null;
- assert !nodes.isEmpty();
-
- int nodesSize = nodes.size();
-
- if (nodesSize == 1) { // Minor optimization.
- ClusterNode ret = F.first(nodes);
-
- assert ret != null;
-
- return ret;
- }
-
- final Collection<UUID> lookup = U.newHashSet(nodesSize);
-
- // Store nodes in map for fast lookup.
- for (ClusterNode n : nodes)
- // Add nodes into hash circle, if absent.
- lookup.add(resolveNode(n));
-
- // Cleanup circle.
- if (lookup.size() != addedNodes.size()) {
- Collection<UUID> rmv = null;
-
- for (Iterator<UUID> iter = addedNodes.iterator(); iter.hasNext(); ) {
- UUID id = iter.next();
-
- if (!lookup.contains(id)) {
- iter.remove();
-
- if (rmv == null)
- rmv = new ArrayList<>();
-
- rmv.add(id);
- }
- }
-
- if (!F.isEmpty(rmv))
- nodeHash.removeNodes(rmv);
- }
-
- UUID nodeId = nodeHash.node(obj, lookup);
-
- assert nodeId != null;
-
- ClusterNode node = ctx.projection().node(nodeId);
-
- if (node != null)
- return node;
- }
- }
-
- /**
- * Add node to hash circle if this is the first node invocation.
- *
- * @param n Node to get info for.
- * @return Node ID.
- */
- private UUID resolveNode(ClusterNode n) {
- UUID nodeId = n.id();
-
- if (!addedNodes.contains(nodeId)) {
- addedNodes.add(nodeId);
-
- nodeHash.addNode(nodeId, REPLICA_CNT);
- }
-
- return nodeId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java
deleted file mode 100644
index a674bae..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Router used to colocate streamer events with data stored in a partitioned cache.
- * <h1 class="header">Affinity Key</h1>
- * Affinity key for collocation of event together on the same node is specified
- * via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement
- * {@link CacheAffinityEvent} interface, then event will be routed always to local node.
- */
-public class StreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter {
- /**
- * All events that implement this interface will be routed based on key affinity.
- */
- @SuppressWarnings("PublicInnerClass")
- public interface CacheAffinityEvent {
- /**
- * @return Affinity route key for the event.
- */
- public Object affinityKey();
-
- /**
- * @return Cache name, if {@code null}, the default cache is used.
- */
- @Nullable public String cacheName();
- }
-
- /** Grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** {@inheritDoc} */
- @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
- if (evt instanceof CacheAffinityEvent) {
- CacheAffinityEvent e = (CacheAffinityEvent)evt;
-
- GridCache<Object, Object> c = ((GridEx) ignite).cachex(e.cacheName());
-
- assert c != null;
-
- return c.affinity().mapKeyToNode(e.affinityKey());
- }
-
- return ignite.cluster().localNode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java
deleted file mode 100644
index 961a722..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.util.typedef.*;
-
-import java.util.*;
-
-/**
- * Local router. Always routes event to local node.
- */
-public class StreamerLocalEventRouter implements StreamerEventRouter {
- /** Grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** {@inheritDoc} */
- @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
- return ignite.cluster().localNode();
- }
-
- /** {@inheritDoc} */
- @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
- Collection<T> evts) {
- return F.asMap(ignite.cluster().localNode(), evts);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java
deleted file mode 100644
index ab8c082..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.util.typedef.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Random router. Routes event to random node.
- */
-public class StreamerRandomEventRouter extends StreamerEventRouterAdapter {
- /** Optional predicates to exclude nodes from routing. */
- private IgnitePredicate<ClusterNode>[] predicates;
-
- /**
- * Empty constructor for spring.
- */
- public StreamerRandomEventRouter() {
- this((IgnitePredicate<ClusterNode>[])null);
- }
-
- /**
- * Constructs random event router with optional set of filters to apply to streamer projection.
- *
- * @param predicates Node predicates.
- */
- public StreamerRandomEventRouter(@Nullable IgnitePredicate<ClusterNode>... predicates) {
- this.predicates = predicates;
- }
-
- /**
- * Constructs random event router with optional set of filters to apply to streamer projection.
- *
- * @param predicates Node predicates.
- */
- @SuppressWarnings("unchecked")
- public StreamerRandomEventRouter(Collection<IgnitePredicate<ClusterNode>> predicates) {
- if (!F.isEmpty(predicates)) {
- this.predicates = new IgnitePredicate[predicates.size()];
-
- predicates.toArray(this.predicates);
- }
- }
-
- /** {@inheritDoc} */
- @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
- Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates);
-
- if (F.isEmpty(nodes))
- return null;
-
- int idx = ThreadLocalRandom8.current().nextInt(nodes.size());
-
- int i = 0;
-
- Iterator<ClusterNode> iter = nodes.iterator();
-
- while (true) {
- if (!iter.hasNext())
- iter = nodes.iterator();
-
- ClusterNode node = iter.next();
-
- if (idx == i++)
- return node;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java
deleted file mode 100644
index 83351d0..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Round robin router.
- */
-public class StreamerRoundRobinEventRouter extends StreamerEventRouterAdapter {
- /** */
- private final AtomicLong lastOrder = new AtomicLong();
-
- /** {@inheritDoc} */
- @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
- Collection<ClusterNode> nodes = ctx.projection().nodes();
-
- int idx = (int)(lastOrder.getAndIncrement() % nodes.size());
-
- int i = 0;
-
- Iterator<ClusterNode> iter = nodes.iterator();
-
- while (true) {
- if (!iter.hasNext())
- iter = nodes.iterator();
-
- ClusterNode node = iter.next();
-
- if (idx == i++)
- return node;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html b/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html
deleted file mode 100644
index 9416d24..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html
+++ /dev/null
@@ -1,14 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<!--
- @html.file.header
- _________ _____ __________________ _____
- __ ____/___________(_)______ /__ ____/______ ____(_)_______
- _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
--->
-<html>
-<body>
- Contains streamer event router implementations.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
index 8b05527..4064075 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
@@ -28,5 +28,5 @@
<import resource="spring-streamer-average-base.xml"/>
<!-- Local router configuration. -->
- <bean id="router.cfg" class="org.gridgain.grid.streamer.router.StreamerLocalEventRouter"/>
+ <bean id="router.cfg" class="org.apache.ignite.streamer.router.StreamerLocalEventRouter"/>
</beans>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
index bc8061f..2abe01a 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
@@ -28,5 +28,5 @@
<import resource="spring-streamer-average-base.xml"/>
<!-- Local router configuration. -->
- <bean id="router.cfg" class="org.gridgain.grid.streamer.router.StreamerRandomEventRouter"/>
+ <bean id="router.cfg" class="org.apache.ignite.streamer.router.StreamerRandomEventRouter"/>
</beans>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
index 763c060..6d3d3d3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
@@ -16,12 +16,12 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.router.*;
import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.spi.discovery.tcp.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.streamer.router.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.*;