You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:29 UTC

[28/32] incubator-ignite git commit: # Renaming

# Renaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/aa795a45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/aa795a45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/aa795a45

Branch: refs/heads/master
Commit: aa795a45f88530ab937e87d1ce4d17022133c1f4
Parents: 4794dd4
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:54:27 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:54:27 2014 +0300

----------------------------------------------------------------------
 examples/config/example-streamer.xml            |   6 +-
 .../streaming/StreamingPriceBarsExample.java    |   2 +-
 .../router/StreamerAffinityEventRouter.java     | 142 +++++++++++++++++++
 .../StreamerCacheAffinityEventRouter.java       |  62 ++++++++
 .../router/StreamerLocalEventRouter.java        |  38 +++++
 .../router/StreamerRandomEventRouter.java       |  81 +++++++++++
 .../router/StreamerRoundRobinEventRouter.java   |  45 ++++++
 .../apache/ignite/streamer/router/package.html  |  14 ++
 .../processors/streamer/IgniteStreamerImpl.java |   2 +-
 .../router/StreamerAffinityEventRouter.java     | 142 -------------------
 .../StreamerCacheAffinityEventRouter.java       |  62 --------
 .../router/StreamerLocalEventRouter.java        |  38 -----
 .../router/StreamerRandomEventRouter.java       |  81 -----------
 .../router/StreamerRoundRobinEventRouter.java   |  45 ------
 .../gridgain/grid/streamer/router/package.html  |  14 --
 .../average/spring-streamer-average-local.xml   |   2 +-
 .../average/spring-streamer-average-random.xml  |   2 +-
 .../streamer/GridStreamerSelfTest.java          |   2 +-
 18 files changed, 390 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml
index 5aa0fec..9879786 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -83,7 +83,7 @@
                             Use random event router to execute streamer stages on random nodes
                             just for demonstration purposes.
                         -->
-                        <bean class="org.gridgain.grid.streamer.router.StreamerRandomEventRouter"/>
+                        <bean class="org.apache.ignite.streamer.router.StreamerRandomEventRouter"/>
                         <!--
                         <bean class="org.gridgain.grid.streamer.router.GridStreamerLocalEventRouter"/>
                         -->
@@ -141,7 +141,7 @@
                             numbers are pushed to wrong nodes, the counters will become
                             inaccurate.
                         -->
-                        <bean class="org.gridgain.grid.streamer.router.StreamerAffinityEventRouter"/>
+                        <bean class="org.apache.ignite.streamer.router.StreamerAffinityEventRouter"/>
                     </property>
 
                     <!--
@@ -198,7 +198,7 @@
                             For this example we use affinity event router to make sure that
                             quotes for identical instruments are routed to the same node.
                         -->
-                        <bean class="org.gridgain.grid.streamer.router.StreamerAffinityEventRouter"/>
+                        <bean class="org.apache.ignite.streamer.router.StreamerAffinityEventRouter"/>
                     </property>
                 </bean>
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
index 6e07c4b..1e22a46 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
@@ -12,9 +12,9 @@ package org.gridgain.examples.streaming;
 import org.apache.ignite.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.router.*;
 import org.gridgain.examples.*;
 import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.router.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
new file mode 100644
index 0000000..7f08b55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerAffinityEventRouter.java
@@ -0,0 +1,142 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Router used to colocate identical streamer events or events with identical affinity
+ * key on the same node. Such collocation is often required to perform computations on
+ * multiple events together, for example, find number of occurrences of a word in some
+ * text. In this case you would collocate identical words together to make sure that
+ * you can update their counts.
+ * <h1 class="header">Affinity Key</h1>
+ * Affinity key for collocation of event together on the same node is specified
+ * via {@link AffinityEvent#affinityKey()} method. If event does not implement
+ * {@link AffinityEvent} interface, then event itself will be used to determine affinity.
+ */
+public class StreamerAffinityEventRouter extends StreamerEventRouterAdapter {
+    /** */
+    public static final int REPLICA_CNT = 128;
+
+    /**
+     * All events that implement this interface will be routed based on key affinity.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public interface AffinityEvent {
+        /**
+         * @return Affinity route key for the event.
+         */
+        public Object affinityKey();
+    }
+
+    /** Grid instance. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** */
+    private final GridConsistentHash<UUID> nodeHash = new GridConsistentHash<>();
+
+    /** */
+    private Collection<UUID> addedNodes = new GridConcurrentHashSet<>();
+
+    /** {@inheritDoc} */
+    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
+        return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() :
+            evt, ctx);
+    }
+
+    /**
+     * @param obj Object.
+     * @param ctx Context.
+     * @return Rich node.
+     */
+    private ClusterNode node(Object obj, StreamerContext ctx) {
+        while (true) {
+            Collection<ClusterNode> nodes = ctx.projection().nodes();
+
+            assert nodes != null;
+            assert !nodes.isEmpty();
+
+            int nodesSize = nodes.size();
+
+            if (nodesSize == 1) { // Minor optimization.
+                ClusterNode ret = F.first(nodes);
+
+                assert ret != null;
+
+                return ret;
+            }
+
+            final Collection<UUID> lookup = U.newHashSet(nodesSize);
+
+            // Store nodes in map for fast lookup.
+            for (ClusterNode n : nodes)
+                // Add nodes into hash circle, if absent.
+                lookup.add(resolveNode(n));
+
+            // Cleanup circle.
+            if (lookup.size() != addedNodes.size()) {
+                Collection<UUID> rmv = null;
+
+                for (Iterator<UUID> iter = addedNodes.iterator(); iter.hasNext(); ) {
+                    UUID id = iter.next();
+
+                    if (!lookup.contains(id)) {
+                        iter.remove();
+
+                        if (rmv == null)
+                            rmv = new ArrayList<>();
+
+                        rmv.add(id);
+                    }
+                }
+
+                if (!F.isEmpty(rmv))
+                    nodeHash.removeNodes(rmv);
+            }
+
+            UUID nodeId = nodeHash.node(obj, lookup);
+
+            assert nodeId != null;
+
+            ClusterNode node = ctx.projection().node(nodeId);
+
+            if (node != null)
+                return node;
+        }
+    }
+
+    /**
+     * Add node to hash circle if this is the first node invocation.
+     *
+     * @param n Node to get info for.
+     * @return Node ID.
+     */
+    private UUID resolveNode(ClusterNode n) {
+        UUID nodeId = n.id();
+
+        if (!addedNodes.contains(nodeId)) {
+            addedNodes.add(nodeId);
+
+            nodeHash.addNode(nodeId, REPLICA_CNT);
+        }
+
+        return nodeId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
new file mode 100644
index 0000000..b618fae
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerCacheAffinityEventRouter.java
@@ -0,0 +1,62 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Router used to colocate streamer events with data stored in a partitioned cache.
+ * <h1 class="header">Affinity Key</h1>
+ * Affinity key for collocation of event together on the same node is specified
+ * via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement
+ * {@link CacheAffinityEvent} interface, then event will be routed always to local node.
+ */
+public class StreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter {
+    /**
+     * All events that implement this interface will be routed based on key affinity.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public interface CacheAffinityEvent {
+        /**
+         * @return Affinity route key for the event.
+         */
+        public Object affinityKey();
+
+        /**
+         * @return Cache name, if {@code null}, the default cache is used.
+         */
+        @Nullable public String cacheName();
+    }
+
+    /** Grid instance. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
+        if (evt instanceof CacheAffinityEvent) {
+            CacheAffinityEvent e = (CacheAffinityEvent)evt;
+
+            GridCache<Object, Object> c = ((GridEx) ignite).cachex(e.cacheName());
+
+            assert c != null;
+
+            return c.affinity().mapKeyToNode(e.affinityKey());
+        }
+
+        return ignite.cluster().localNode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
new file mode 100644
index 0000000..6109fb5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerLocalEventRouter.java
@@ -0,0 +1,38 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.util.typedef.*;
+
+import java.util.*;
+
+/**
+ * Local router. Always routes event to local node.
+ */
+public class StreamerLocalEventRouter implements StreamerEventRouter {
+    /** Grid instance. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
+        return ignite.cluster().localNode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
+        Collection<T> evts) {
+        return F.asMap(ignite.cluster().localNode(), evts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
new file mode 100644
index 0000000..c1f28b9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRandomEventRouter.java
@@ -0,0 +1,81 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.util.typedef.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Random router. Routes event to random node.
+ */
+public class StreamerRandomEventRouter extends StreamerEventRouterAdapter {
+    /** Optional predicates to exclude nodes from routing. */
+    private IgnitePredicate<ClusterNode>[] predicates;
+
+    /**
+     * Empty constructor for spring.
+     */
+    public StreamerRandomEventRouter() {
+        this((IgnitePredicate<ClusterNode>[])null);
+    }
+
+    /**
+     * Constructs random event router with optional set of filters to apply to streamer projection.
+     *
+     * @param predicates Node predicates.
+     */
+    public StreamerRandomEventRouter(@Nullable IgnitePredicate<ClusterNode>... predicates) {
+        this.predicates = predicates;
+    }
+
+    /**
+     * Constructs random event router with optional set of filters to apply to streamer projection.
+     *
+     * @param predicates Node predicates.
+     */
+    @SuppressWarnings("unchecked")
+    public StreamerRandomEventRouter(Collection<IgnitePredicate<ClusterNode>> predicates) {
+        if (!F.isEmpty(predicates)) {
+            this.predicates = new IgnitePredicate[predicates.size()];
+
+            predicates.toArray(this.predicates);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
+        Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates);
+
+        if (F.isEmpty(nodes))
+            return null;
+
+        int idx = ThreadLocalRandom8.current().nextInt(nodes.size());
+
+        int i = 0;
+
+        Iterator<ClusterNode> iter = nodes.iterator();
+
+        while (true) {
+            if (!iter.hasNext())
+                iter = nodes.iterator();
+
+            ClusterNode node = iter.next();
+
+            if (idx == i++)
+                return node;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
new file mode 100644
index 0000000..bf8d981
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/StreamerRoundRobinEventRouter.java
@@ -0,0 +1,45 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.router;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.streamer.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Round robin router.
+ */
+public class StreamerRoundRobinEventRouter extends StreamerEventRouterAdapter {
+    /** */
+    private final AtomicLong lastOrder = new AtomicLong();
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
+        Collection<ClusterNode> nodes = ctx.projection().nodes();
+
+        int idx = (int)(lastOrder.getAndIncrement() % nodes.size());
+
+        int i = 0;
+
+        Iterator<ClusterNode> iter = nodes.iterator();
+
+        while (true) {
+            if (!iter.hasNext())
+                iter = nodes.iterator();
+
+            ClusterNode node = iter.next();
+
+            if (idx == i++)
+                return node;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html b/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html
new file mode 100644
index 0000000..9416d24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/router/package.html
@@ -0,0 +1,14 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+    @html.file.header
+    _________        _____ __________________        _____
+    __  ____/___________(_)______  /__  ____/______ ____(_)_______
+    _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+    / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+    \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+-->
+<html>
+<body>
+    Contains streamer event router implementations.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
index 3586b07..0861c10 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
@@ -14,13 +14,13 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.router.*;
 import org.apache.ignite.thread.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.managers.communication.*;
 import org.gridgain.grid.kernal.managers.deployment.*;
 import org.gridgain.grid.kernal.managers.eventstorage.*;
-import org.gridgain.grid.streamer.router.*;
 import org.gridgain.grid.util.*;
 import org.gridgain.grid.util.direct.*;
 import org.gridgain.grid.util.future.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java
deleted file mode 100644
index 35b7d08..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerAffinityEventRouter.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.util.*;
-
-/**
- * Router used to colocate identical streamer events or events with identical affinity
- * key on the same node. Such collocation is often required to perform computations on
- * multiple events together, for example, find number of occurrences of a word in some
- * text. In this case you would collocate identical words together to make sure that
- * you can update their counts.
- * <h1 class="header">Affinity Key</h1>
- * Affinity key for collocation of event together on the same node is specified
- * via {@link AffinityEvent#affinityKey()} method. If event does not implement
- * {@link AffinityEvent} interface, then event itself will be used to determine affinity.
- */
-public class StreamerAffinityEventRouter extends StreamerEventRouterAdapter {
-    /** */
-    public static final int REPLICA_CNT = 128;
-
-    /**
-     * All events that implement this interface will be routed based on key affinity.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public interface AffinityEvent {
-        /**
-         * @return Affinity route key for the event.
-         */
-        public Object affinityKey();
-    }
-
-    /** Grid instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** */
-    private final GridConsistentHash<UUID> nodeHash = new GridConsistentHash<>();
-
-    /** */
-    private Collection<UUID> addedNodes = new GridConcurrentHashSet<>();
-
-    /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-        return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() :
-            evt, ctx);
-    }
-
-    /**
-     * @param obj Object.
-     * @param ctx Context.
-     * @return Rich node.
-     */
-    private ClusterNode node(Object obj, StreamerContext ctx) {
-        while (true) {
-            Collection<ClusterNode> nodes = ctx.projection().nodes();
-
-            assert nodes != null;
-            assert !nodes.isEmpty();
-
-            int nodesSize = nodes.size();
-
-            if (nodesSize == 1) { // Minor optimization.
-                ClusterNode ret = F.first(nodes);
-
-                assert ret != null;
-
-                return ret;
-            }
-
-            final Collection<UUID> lookup = U.newHashSet(nodesSize);
-
-            // Store nodes in map for fast lookup.
-            for (ClusterNode n : nodes)
-                // Add nodes into hash circle, if absent.
-                lookup.add(resolveNode(n));
-
-            // Cleanup circle.
-            if (lookup.size() != addedNodes.size()) {
-                Collection<UUID> rmv = null;
-
-                for (Iterator<UUID> iter = addedNodes.iterator(); iter.hasNext(); ) {
-                    UUID id = iter.next();
-
-                    if (!lookup.contains(id)) {
-                        iter.remove();
-
-                        if (rmv == null)
-                            rmv = new ArrayList<>();
-
-                        rmv.add(id);
-                    }
-                }
-
-                if (!F.isEmpty(rmv))
-                    nodeHash.removeNodes(rmv);
-            }
-
-            UUID nodeId = nodeHash.node(obj, lookup);
-
-            assert nodeId != null;
-
-            ClusterNode node = ctx.projection().node(nodeId);
-
-            if (node != null)
-                return node;
-        }
-    }
-
-    /**
-     * Add node to hash circle if this is the first node invocation.
-     *
-     * @param n Node to get info for.
-     * @return Node ID.
-     */
-    private UUID resolveNode(ClusterNode n) {
-        UUID nodeId = n.id();
-
-        if (!addedNodes.contains(nodeId)) {
-            addedNodes.add(nodeId);
-
-            nodeHash.addNode(nodeId, REPLICA_CNT);
-        }
-
-        return nodeId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java
deleted file mode 100644
index a674bae..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerCacheAffinityEventRouter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.cache.*;
-import org.gridgain.grid.kernal.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Router used to colocate streamer events with data stored in a partitioned cache.
- * <h1 class="header">Affinity Key</h1>
- * Affinity key for collocation of event together on the same node is specified
- * via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement
- * {@link CacheAffinityEvent} interface, then event will be routed always to local node.
- */
-public class StreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter {
-    /**
-     * All events that implement this interface will be routed based on key affinity.
-     */
-    @SuppressWarnings("PublicInnerClass")
-    public interface CacheAffinityEvent {
-        /**
-         * @return Affinity route key for the event.
-         */
-        public Object affinityKey();
-
-        /**
-         * @return Cache name, if {@code null}, the default cache is used.
-         */
-        @Nullable public String cacheName();
-    }
-
-    /** Grid instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-        if (evt instanceof CacheAffinityEvent) {
-            CacheAffinityEvent e = (CacheAffinityEvent)evt;
-
-            GridCache<Object, Object> c = ((GridEx) ignite).cachex(e.cacheName());
-
-            assert c != null;
-
-            return c.affinity().mapKeyToNode(e.affinityKey());
-        }
-
-        return ignite.cluster().localNode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java
deleted file mode 100644
index 961a722..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerLocalEventRouter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.util.typedef.*;
-
-import java.util.*;
-
-/**
- * Local router. Always routes event to local node.
- */
-public class StreamerLocalEventRouter implements StreamerEventRouter {
-    /** Grid instance. */
-    @IgniteInstanceResource
-    private Ignite ignite;
-
-    /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
-        return ignite.cluster().localNode();
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
-        Collection<T> evts) {
-        return F.asMap(ignite.cluster().localNode(), evts);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java
deleted file mode 100644
index ab8c082..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRandomEventRouter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.util.typedef.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Random router. Routes event to random node.
- */
-public class StreamerRandomEventRouter extends StreamerEventRouterAdapter {
-    /** Optional predicates to exclude nodes from routing. */
-    private IgnitePredicate<ClusterNode>[] predicates;
-
-    /**
-     * Empty constructor for spring.
-     */
-    public StreamerRandomEventRouter() {
-        this((IgnitePredicate<ClusterNode>[])null);
-    }
-
-    /**
-     * Constructs random event router with optional set of filters to apply to streamer projection.
-     *
-     * @param predicates Node predicates.
-     */
-    public StreamerRandomEventRouter(@Nullable IgnitePredicate<ClusterNode>... predicates) {
-        this.predicates = predicates;
-    }
-
-    /**
-     * Constructs random event router with optional set of filters to apply to streamer projection.
-     *
-     * @param predicates Node predicates.
-     */
-    @SuppressWarnings("unchecked")
-    public StreamerRandomEventRouter(Collection<IgnitePredicate<ClusterNode>> predicates) {
-        if (!F.isEmpty(predicates)) {
-            this.predicates = new IgnitePredicate[predicates.size()];
-
-            predicates.toArray(this.predicates);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
-        Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates);
-
-        if (F.isEmpty(nodes))
-            return null;
-
-        int idx = ThreadLocalRandom8.current().nextInt(nodes.size());
-
-        int i = 0;
-
-        Iterator<ClusterNode> iter = nodes.iterator();
-
-        while (true) {
-            if (!iter.hasNext())
-                iter = nodes.iterator();
-
-            ClusterNode node = iter.next();
-
-            if (idx == i++)
-                return node;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java
deleted file mode 100644
index 83351d0..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/StreamerRoundRobinEventRouter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.router;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.streamer.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Round robin router.
- */
-public class StreamerRoundRobinEventRouter extends StreamerEventRouterAdapter {
-    /** */
-    private final AtomicLong lastOrder = new AtomicLong();
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
-        Collection<ClusterNode> nodes = ctx.projection().nodes();
-
-        int idx = (int)(lastOrder.getAndIncrement() % nodes.size());
-
-        int i = 0;
-
-        Iterator<ClusterNode> iter = nodes.iterator();
-
-        while (true) {
-            if (!iter.hasNext())
-                iter = nodes.iterator();
-
-            ClusterNode node = iter.next();
-
-            if (idx == i++)
-                return node;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html b/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html
deleted file mode 100644
index 9416d24..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/package.html
+++ /dev/null
@@ -1,14 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<!--
-    @html.file.header
-    _________        _____ __________________        _____
-    __  ____/___________(_)______  /__  ____/______ ____(_)_______
-    _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
-    / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
-    \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
--->
-<html>
-<body>
-    Contains streamer event router implementations.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
index 8b05527..4064075 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-local.xml
@@ -28,5 +28,5 @@
     <import resource="spring-streamer-average-base.xml"/>
 
     <!-- Local router configuration. -->
-    <bean id="router.cfg" class="org.gridgain.grid.streamer.router.StreamerLocalEventRouter"/>
+    <bean id="router.cfg" class="org.apache.ignite.streamer.router.StreamerLocalEventRouter"/>
 </beans>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
index bc8061f..2abe01a 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-random.xml
@@ -28,5 +28,5 @@
     <import resource="spring-streamer-average-base.xml"/>
 
     <!-- Local router configuration. -->
-    <bean id="router.cfg" class="org.gridgain.grid.streamer.router.StreamerRandomEventRouter"/>
+    <bean id="router.cfg" class="org.apache.ignite.streamer.router.StreamerRandomEventRouter"/>
 </beans>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/aa795a45/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
index 763c060..6d3d3d3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
@@ -16,12 +16,12 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.router.*;
 import org.apache.ignite.streamer.window.*;
 import org.gridgain.grid.*;
 import org.gridgain.grid.spi.discovery.tcp.*;
 import org.gridgain.grid.spi.discovery.tcp.ipfinder.*;
 import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.streamer.router.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 import org.gridgain.testframework.*;