You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:08 UTC

[07/32] incubator-ignite git commit: # Renaming

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java
new file mode 100644
index 0000000..1e5cf5f
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java
@@ -0,0 +1,161 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.apache.ignite.mbean.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Streamer MBean interface.
+ */
+@IgniteMBeanDescription("MBean that provides access to streamer description and metrics.")
+public interface StreamerMBean {
+    /**
+     * Gets streamer name.
+     *
+     * @return Streamer name.
+     */
+    @IgniteMBeanDescription("Streamer name.")
+    @Nullable public String getName();
+
+    /**
+     * Gets {@code atLeastOnce} configuration flag.
+     *
+     * @return {@code True} if {@code atLeastOnce} is configured.
+     */
+    @IgniteMBeanDescription("True if atLeastOnce is configured.")
+    public boolean isAtLeastOnce();
+
+    /**
+     * Gets size of stage futures map. This map is maintained only when {@code atLeastOnce} configuration
+     * flag is set to true.
+     *
+     * @return Stage future map size.
+     */
+    @IgniteMBeanDescription("Stage future map size.")
+    public int getStageFutureMapSize();
+
+    /**
+     * Gets size of batch futures map.
+     *
+     * @return Batch future map size.
+     */
+    @IgniteMBeanDescription("Batch future map size.")
+    public int getBatchFutureMapSize();
+
+    /**
+     * Gets number of stages currently being executed in streamer pool.
+     *
+     * @return Number of stages. Cannot be more than pool thread count.
+     */
+    @IgniteMBeanDescription("Number of stages currently being executed in streamer pool.")
+    public int getStageActiveExecutionCount();
+
+    /**
+     * Gets number of event batches currently waiting to be executed.
+     *
+     * @return Number of event batches waiting to be processed.
+     */
+    @IgniteMBeanDescription("Number of event batches currently waiting to be executed.")
+    public int getStageWaitingExecutionCount();
+
+    /**
+     * Gets total number of stages executed since last reset.
+     *
+     * @return Total number of stages executed since last reset.
+     */
+    @IgniteMBeanDescription("Total number of stages executed since last reset.")
+    public long getStageTotalExecutionCount();
+
+    /**
+     * Gets pipeline maximum execution time, i.e. time between execution start and time when last stage in pipeline
+     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+     * recorded independently.
+     *
+     * @return Pipeline maximum execution time.
+     */
+    @IgniteMBeanDescription("Pipeline maximum execution time.")
+    public long getPipelineMaximumExecutionTime();
+
+    /**
+     * Gets pipeline minimum execution time, i.e. time between execution start and time when last stage in pipeline
+     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+     * recorded independently.
+     *
+     * @return Pipeline minimum execution time.
+     */
+    @IgniteMBeanDescription("Pipeline minimum execution time.")
+    public long getPipelineMinimumExecutionTime();
+
+    /**
+     * Gets pipeline average execution time, i.e. time between execution start and time when last stage in pipeline
+     * returned empty map. If pipeline execution was split, metrics for each split will be recorded independently.
+     *
+     * @return Pipeline average execution time.
+     */
+    @IgniteMBeanDescription("Pipeline average execution time.")
+    public long getPipelineAverageExecutionTime();
+
+    /**
+     * Gets maximum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Maximum number of unique nodes in pipeline execution.
+     */
+    @IgniteMBeanDescription("Maximum number of unique nodes participated in pipeline execution.")
+    public int getPipelineMaximumExecutionNodes();
+
+    /**
+     * Gets minimum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Minimum number of unique nodes in pipeline execution.
+     */
+    @IgniteMBeanDescription("Minimum number of unique nodes participated in pipeline execution.")
+    public int getPipelineMinimumExecutionNodes();
+
+    /**
+     * Gets average number of unique nodes participated in pipeline execution. If pipeline execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Average number of unique nodes in pipeline execution.
+     */
+    @IgniteMBeanDescription("Average number of unique nodes participated in pipeline execution.")
+    public int getPipelineAverageExecutionNodes();
+
+    /**
+     * Gets number of current active sessions. Since event execution sessions are tracked only when
+     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+     *
+     * @return Number of current active sessions.
+     */
+    @IgniteMBeanDescription("Number of current active sessions.")
+    public int getCurrentActiveSessions();
+
+    /**
+     * Gets maximum number of active sessions since last reset. Since event execution sessions are tracked only when
+     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+     *
+     * @return Maximum active sessions since last reset.
+     */
+    @IgniteMBeanDescription("Maximum number of active sessions since last reset.")
+    public int getMaximumActiveSessions();
+
+    /**
+     * Gets number of failures since last reset. If {@code atLeastOnce} flag is set to steamer configuration,
+     * then only root node failures will be counted. Otherwise each node will count failures independently.
+     *
+     * @return Failures count.
+     */
+    @IgniteMBeanDescription("Number of failures since last reset.")
+    public int getFailuresCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
new file mode 100644
index 0000000..eaeb2f1
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
@@ -0,0 +1,201 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import java.util.*;
+
+/**
+ * Streamer metrics.
+ */
+public interface StreamerMetrics {
+    /**
+     * Gets number of stages currently being executed in streamer pool.
+     *
+     * @return Number of stages. Cannot be more than pool thread count.
+     */
+    public int stageActiveExecutionCount();
+
+    /**
+     * Gets number of event batches currently waiting to be executed.
+     *
+     * @return Number of event batches waiting to be processed.
+     */
+    public int stageWaitingExecutionCount();
+
+    /**
+     * Gets total number of stages executed since last reset.
+     *
+     * @return Total number of stages executed since last reset.
+     */
+    public long stageTotalExecutionCount();
+
+    /**
+     * Gets pipeline maximum execution time, i.e. time between execution start and time when last stage in pipeline
+     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+     * recorded independently.
+     *
+     * @return Pipeline maximum execution time.
+     */
+    public long pipelineMaximumExecutionTime();
+
+    /**
+     * Gets pipeline minimum execution time, i.e. time between execution start and time when last stage in pipeline
+     * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+     * recorded independently.
+     *
+     * @return Pipeline minimum execution time.
+     */
+    public long pipelineMinimumExecutionTime();
+
+    /**
+     * Gets pipeline average execution time, i.e. time between execution start and time when last stage in pipeline
+     * returned empty map. If pipeline execution was split, metrics for each split will be recorded independently.
+     *
+     * @return Pipeline average execution time.
+     */
+    public long pipelineAverageExecutionTime();
+
+    /**
+     * Gets maximum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Maximum number of unique nodes in pipeline execution.
+     */
+    public int pipelineMaximumExecutionNodes();
+
+    /**
+     * Gets minimum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Minimum number of unique nodes in pipeline execution.
+     */
+    public int pipelineMinimumExecutionNodes();
+
+    /**
+     * Gets average number of unique nodes participated in pipeline execution. If pipeline execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Average number of unique nodes in pipeline execution.
+     */
+    public int pipelineAverageExecutionNodes();
+
+    /**
+     * Gets query maximum execution time. If query execution was split to different nodes, metrics for each split
+     * will be recorded independently.
+     *
+     * @return Query maximum execution time.
+     */
+    public long queryMaximumExecutionTime();
+
+    /**
+     * Gets query minimum execution time. If query execution was split to different nodes, metrics for each split
+     * will be recorded independently.
+     *
+     * @return Query minimum execution time.
+     */
+    public long queryMinimumExecutionTime();
+
+    /**
+     * Gets query average execution time. If query execution was split to different nodes, metrics for each split
+     * will be recorded independently.
+     *
+     * @return Query average execution time.
+     */
+    public long queryAverageExecutionTime();
+
+    /**
+     * Gets maximum number of unique nodes participated in query execution. If query execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Maximum number of unique nodes in query execution.
+     */
+    public int queryMaximumExecutionNodes();
+
+    /**
+     * Gets minimum number of unique nodes participated in query execution. If query execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Minimum number of unique nodes in query execution.
+     */
+    public int queryMinimumExecutionNodes();
+
+    /**
+     * Gets average number of unique nodes participated in query execution. If query execution was split,
+     * metrics for each split will be recorded independently.
+     *
+     * @return Average number of unique nodes in query execution.
+     */
+    public int queryAverageExecutionNodes();
+
+    /**
+     * Gets number of current active sessions. Since event execution sessions are tracked only when
+     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+     *
+     * @return Number of current active sessions.
+     */
+    public int currentActiveSessions();
+
+    /**
+     * Gets maximum number of active sessions since last reset. Since event execution sessions are tracked only when
+     * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+     * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+     *
+     * @return Maximum active sessions since last reset.
+     */
+    public int maximumActiveSessions();
+
+    /**
+     * Gets number of failures. If {@code atLeastOnce} flag is set to steamer configuration, then only root node
+     * failures will be counted. Otherwise each node will count failures independently.
+     *
+     * @return Failures count.
+     */
+    public int failuresCount();
+
+    /**
+     * Gets maximum number of threads in executor service.
+     *
+     * @return Maximum number of threads in executor service.
+     */
+    public int executorServiceCapacity();
+
+    /**
+     * Gets current stage metrics, if stage with given name is not configured
+     * then {@link IllegalArgumentException} will be thrown.
+     *
+     * @param stageName Stage name.
+     * @return Stage metrics.
+     */
+    public GridStreamerStageMetrics stageMetrics(String stageName);
+
+    /**
+     * Gets metrics for all stages. Stage metrics order is the same as stage order in configuration.
+     *
+     * @return Stage metrics collection.
+     */
+    public Collection<GridStreamerStageMetrics> stageMetrics();
+
+    /**
+     * Gets current window metrics, if window with given name is not configured
+     * then {@link IllegalArgumentException} will be thrown.
+     *
+     * @param winName Window name.
+     * @return Window metrics.
+     */
+    public GridStreamerWindowMetrics windowMetrics(String winName);
+
+    /**
+     * Gets metrics for all windows.
+     *
+     * @return Collection of window metrics.
+     */
+    public Collection<GridStreamerWindowMetrics> windowMetrics();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
index 97d3df1..5a71fc2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
@@ -30,7 +30,7 @@ import java.util.*;
  * via {@link AffinityEvent#affinityKey()} method. If event does not implement
  * {@link AffinityEvent} interface, then event itself will be used to determine affinity.
  */
-public class GridStreamerAffinityEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerAffinityEventRouter extends StreamerEventRouterAdapter {
     /** */
     public static final int REPLICA_CNT = 128;
 
@@ -56,7 +56,7 @@ public class GridStreamerAffinityEventRouter extends GridStreamerEventRouterAdap
     private Collection<UUID> addedNodes = new GridConcurrentHashSet<>();
 
     /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
         return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() :
             evt, ctx);
     }
@@ -66,7 +66,7 @@ public class GridStreamerAffinityEventRouter extends GridStreamerEventRouterAdap
      * @param ctx Context.
      * @return Rich node.
      */
-    private ClusterNode node(Object obj, GridStreamerContext ctx) {
+    private ClusterNode node(Object obj, StreamerContext ctx) {
         while (true) {
             Collection<ClusterNode> nodes = ctx.projection().nodes();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
index 99af374..c4f1d82 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.*;
  * via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement
  * {@link CacheAffinityEvent} interface, then event will be routed always to local node.
  */
-public class GridStreamerCacheAffinityEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter {
     /**
      * All events that implement this interface will be routed based on key affinity.
      */
@@ -46,7 +46,7 @@ public class GridStreamerCacheAffinityEventRouter extends GridStreamerEventRoute
     private Ignite ignite;
 
     /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
         if (evt instanceof CacheAffinityEvent) {
             CacheAffinityEvent e = (CacheAffinityEvent)evt;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
index b9eccd0..6f6794a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
@@ -20,18 +20,18 @@ import java.util.*;
 /**
  * Local router. Always routes event to local node.
  */
-public class GridStreamerLocalEventRouter implements GridStreamerEventRouter {
+public class GridStreamerLocalEventRouter implements StreamerEventRouter {
     /** Grid instance. */
     @IgniteInstanceResource
     private Ignite ignite;
 
     /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
         return ignite.cluster().localNode();
     }
 
     /** {@inheritDoc} */
-    @Override public <T> Map<ClusterNode, Collection<T>> route(GridStreamerContext ctx, String stageName,
+    @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
         Collection<T> evts) {
         return F.asMap(ignite.cluster().localNode(), evts);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
index b780142..36f3c8e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
@@ -21,7 +21,7 @@ import java.util.*;
 /**
  * Random router. Routes event to random node.
  */
-public class GridStreamerRandomEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerRandomEventRouter extends StreamerEventRouterAdapter {
     /** Optional predicates to exclude nodes from routing. */
     private IgnitePredicate<ClusterNode>[] predicates;
 
@@ -56,7 +56,7 @@ public class GridStreamerRandomEventRouter extends GridStreamerEventRouterAdapte
     }
 
     /** {@inheritDoc} */
-    @Override public ClusterNode route(GridStreamerContext ctx, String stageName, Object evt) {
+    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
         Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates);
 
         if (F.isEmpty(nodes))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
index 4597405..2de11f2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
@@ -18,12 +18,12 @@ import java.util.concurrent.atomic.*;
 /**
  * Round robin router.
  */
-public class GridStreamerRoundRobinEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerRoundRobinEventRouter extends StreamerEventRouterAdapter {
     /** */
     private final AtomicLong lastOrder = new AtomicLong();
 
     /** {@inheritDoc} */
-    @Override public ClusterNode route(GridStreamerContext ctx, String stageName, Object evt) {
+    @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
         Collection<ClusterNode> nodes = ctx.projection().nodes();
 
         int idx = (int)(lastOrder.getAndIncrement() % nodes.size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
index 0e60c33..012b6c2 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
@@ -33,7 +33,7 @@
     <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" parent="base.grid.cfg">
         <property name="streamerConfiguration">
             <list>
-                <bean class="org.gridgain.grid.streamer.GridStreamerConfiguration">
+                <bean class="org.gridgain.grid.streamer.StreamerConfiguration">
                     <property name="windows">
                         <bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedSizeWindow">
                             <property name="maximumSize" value="500"/>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
index 9a3c859..1b7aea0 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
@@ -40,7 +40,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
     private Collection<GridStreamerStage> stages;
 
     /** Event router. */
-    private GridStreamerEventRouter router;
+    private StreamerEventRouter router;
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -62,8 +62,8 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
     /**
      * @return Streamer configuration.
      */
-    private GridStreamerConfiguration streamerConfiguration() {
-        GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+    private StreamerConfiguration streamerConfiguration() {
+        StreamerConfiguration cfg = new StreamerConfiguration();
 
         cfg.setRouter(router);
 
@@ -90,7 +90,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) throws GridException {
                 assert evts.size() == 1;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
index b4f7725..da98634 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
@@ -60,8 +60,8 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
     /**
      * @return Streamer configuration.
      */
-    private GridStreamerConfiguration streamerConfiguration() {
-        GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+    private StreamerConfiguration streamerConfiguration() {
+        StreamerConfiguration cfg = new StreamerConfiguration();
 
         cfg.setAtLeastOnce(true);
 
@@ -77,7 +77,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
 
         SC pass = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> objects) {
                 assert ctx.nextStageName() != null;
 
@@ -87,7 +87,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
         };
 
         SC put = new SC() {
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) {
                 ConcurrentMap<Object, AtomicInteger> cntrs = ctx.localSpace();
 
@@ -160,7 +160,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
 
             IgniteStreamer streamer = grid(0).streamer(null);
 
-            streamer.addStreamerFailureListener(new GridStreamerFailureListener() {
+            streamer.addStreamerFailureListener(new StreamerFailureListener() {
                 @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
                     info("Unable to failover events [stageName=" + stageName + ", err=" + err + ']');
 
@@ -206,7 +206,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
     /**
      * Test random router.
      */
-    private static class TestRandomRouter extends GridStreamerEventRouterAdapter {
+    private static class TestRandomRouter extends StreamerEventRouterAdapter {
         /** Source node ID. */
         private UUID srcNodeId;
 
@@ -214,7 +214,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
         private UUID destNodeId;
 
         /** {@inheritDoc} */
-        @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+        @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
             if ("put".equals(stageName))
                 return ctx.projection().node(destNodeId);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
index 083af44..8b7d2b3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
@@ -20,12 +20,12 @@ import org.jetbrains.annotations.*;
 import java.util.*;
 
 /**
- * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link GridStreamerConfiguration}.
+ * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link org.gridgain.grid.streamer.StreamerConfiguration}.
  */
 public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwareSelfTest {
     /**
      */
-    private static class TestEventRouter extends TestLifecycleAware implements GridStreamerEventRouter {
+    private static class TestEventRouter extends TestLifecycleAware implements StreamerEventRouter {
         /**
          */
         TestEventRouter() {
@@ -33,12 +33,12 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+        @Nullable @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
             return null;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <T> Map<ClusterNode, Collection<T>> route(GridStreamerContext ctx,
+        @Nullable @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx,
             String stageName, Collection<T> evts) {
             return null;
         }
@@ -59,7 +59,7 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection evts) {
+        @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
             return null;
         }
     }
@@ -177,7 +177,7 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
     @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        GridStreamerConfiguration streamerCfg = new GridStreamerConfiguration();
+        StreamerConfiguration streamerCfg = new StreamerConfiguration();
 
         TestEventRouter router = new TestEventRouter();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
index 51986a9..7eb843f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
@@ -51,7 +51,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
     private Collection<GridStreamerStage> stages;
 
     /** Event router. */
-    private GridStreamerEventRouter router;
+    private StreamerEventRouter router;
 
     /** P2P enabled flag. */
     private boolean p2pEnabled;
@@ -81,8 +81,8 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
     /**
      * @return Streamer configuration.
      */
-    private GridStreamerConfiguration streamerConfiguration() {
-        GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+    private StreamerConfiguration streamerConfiguration() {
+        StreamerConfiguration cfg = new StreamerConfiguration();
 
         cfg.setAtLeastOnce(atLeastOnce);
 
@@ -114,7 +114,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
                 return "name";
             }
 
-            @Nullable @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection evts) {
+            @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
                 assert g != null;
                 assert log != null;
 
@@ -155,7 +155,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts)
                 throws GridException {
                 String nextStage = ctx.nextStageName();
@@ -214,18 +214,18 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
             // Wait until all acks are received.
             GridTestUtils.retryAssert(log, 100, 50, new CA() {
                 @Override public void apply() {
-                    GridStreamerMetrics metrics = ignite0.streamer(null).metrics();
+                    StreamerMetrics metrics = ignite0.streamer(null).metrics();
 
                     assertEquals(0, metrics.currentActiveSessions());
                 }
             });
 
-            GridStreamerMetrics metrics = ignite0.streamer(null).metrics();
+            StreamerMetrics metrics = ignite0.streamer(null).metrics();
 
             assertTrue(metrics.maximumActiveSessions() > 0);
 
-            ignite0.streamer(null).context().query(new IgniteClosure<GridStreamerContext, Object>() {
-                @Override public Object apply(GridStreamerContext ctx) {
+            ignite0.streamer(null).context().query(new IgniteClosure<StreamerContext, Object>() {
+                @Override public Object apply(StreamerContext ctx) {
                     try {
                         U.sleep(1000);
                     }
@@ -265,7 +265,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) {
                 String nextStage = ctx.nextStageName();
 
@@ -325,7 +325,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) {
                 String nextStage = ctx.nextStageName();
 
@@ -385,7 +385,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) {
                 String nextStage = ctx.nextStageName();
 
@@ -411,7 +411,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
             final CountDownLatch errLatch = new CountDownLatch(1);
 
-            grid(0).streamer(null).addStreamerFailureListener(new GridStreamerFailureListener() {
+            grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() {
                 @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
                     info("Expected failure: " + err.getMessage());
 
@@ -449,7 +449,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) throws GridException {
                 String nextStage = ctx.nextStageName();
 
@@ -530,7 +530,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) {
                 ConcurrentMap<String, AtomicInteger> space = ctx.localSpace();
 
@@ -605,12 +605,12 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
             for (String s : stages)
                 assertEquals((Integer)sum, stagesSum.get(s));
 
-            GridStreamerContext streamerCtx = grid(0).streamer(null).context();
+            StreamerContext streamerCtx = grid(0).streamer(null).context();
 
             // Check query.
             for (final String s : stages) {
-                Collection<Integer> res = streamerCtx.query(new C1<GridStreamerContext, Integer>() {
-                    @Override public Integer apply(GridStreamerContext ctx) {
+                Collection<Integer> res = streamerCtx.query(new C1<StreamerContext, Integer>() {
+                    @Override public Integer apply(StreamerContext ctx) {
                         AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s);
 
                         return cntr.get();
@@ -621,8 +621,8 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
             }
 
             // Check broadcast.
-            streamerCtx.broadcast(new CI1<GridStreamerContext>() {
-                @Override public void apply(GridStreamerContext ctx) {
+            streamerCtx.broadcast(new CI1<StreamerContext>() {
+                @Override public void apply(StreamerContext ctx) {
                     int sum = 0;
 
                     ConcurrentMap<String, AtomicInteger> space = ctx.localSpace();
@@ -652,8 +652,8 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
             // Check reduce.
             for (final String s : stages) {
                 Integer res = streamerCtx.reduce(
-                    new C1<GridStreamerContext, Integer>() {
-                        @Override public Integer apply(GridStreamerContext ctx) {
+                    new C1<StreamerContext, Integer>() {
+                        @Override public Integer apply(StreamerContext ctx) {
                             AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s);
 
                             return cntr.get();
@@ -683,7 +683,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
         SC stage = new SC() {
             @SuppressWarnings("unchecked")
-            @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+            @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
                 Collection<Object> evts) {
                 return ctx.nextStageName() == null ? null : (Map)F.asMap(ctx.nextStageName(), F.asList(0));
             }
@@ -699,7 +699,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
 
             final CountDownLatch errLatch = new CountDownLatch(errCnt);
 
-            grid(0).streamer(null).addStreamerFailureListener(new GridStreamerFailureListener() {
+            grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() {
                 @Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
                     info("Expected failure: " + err.getMessage());
 
@@ -726,7 +726,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
     private void checkMetrics(Ignite ignite, String stage, int evtCnt, boolean pipeline) {
         IgniteStreamer streamer = ignite.streamer(null);
 
-        GridStreamerMetrics metrics = streamer.metrics();
+        StreamerMetrics metrics = streamer.metrics();
 
         assertEquals(evtCnt, metrics.stageTotalExecutionCount());
         assertEquals(0, metrics.stageWaitingExecutionCount());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
index a8585b3..5e6f4f7 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
@@ -39,7 +39,7 @@ class GridTestStage implements GridStreamerStage<Object> {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection<Object> evts)
+    @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts)
         throws GridException {
         return stageClos.apply(name(), ctx, evts);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
index 99e7887..fd9b642 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
@@ -17,7 +17,7 @@ import java.util.*;
 /**
  * Test router.
  */
-class GridTestStreamerEventRouter extends GridStreamerEventRouterAdapter {
+class GridTestStreamerEventRouter extends StreamerEventRouterAdapter {
     /** Route table. */
     private Map<String, UUID> routeTbl = new HashMap<>();
 
@@ -30,7 +30,7 @@ class GridTestStreamerEventRouter extends GridStreamerEventRouterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+    @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
         UUID nodeId = routeTbl.get(stageName);
 
         if (nodeId == null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
index 1e0e079..ab33222 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
@@ -18,6 +18,6 @@ import java.util.*;
  * Typedef for generic closure.
  */
 abstract class SC
-    extends GridClosure3X<String, GridStreamerContext, Collection<Object>, Map<String, Collection<?>>> {
+    extends GridClosure3X<String, StreamerContext, Collection<Object>, Map<String, Collection<?>>> {
     // No-op.
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
index 92aefe0..812ddb7 100644
--- a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
@@ -112,18 +112,18 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest
     /**
      * @return Streamer configuration.
      */
-    private static GridStreamerConfiguration streamerConfiguration() {
+    private static StreamerConfiguration streamerConfiguration() {
         Collection<GridStreamerStage> stages = F.<GridStreamerStage>asList(new GridStreamerStage() {
             @Override public String name() {
                 return "name";
             }
 
-            @Nullable @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection evts) {
+            @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
                 return null;
             }
         });
 
-        GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+        StreamerConfiguration cfg = new StreamerConfiguration();
 
         cfg.setAtLeastOnce(true);
         cfg.setWindows(F.asList((GridStreamerWindow)new GridStreamerUnboundedWindow()));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java b/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
index be07734..e6f75ac 100644
--- a/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
+++ b/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
@@ -26,7 +26,7 @@ class TestStage implements GridStreamerStage<Integer> {
     }
 
     /** {@inheritDoc} */
-    @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection<Integer> evts)
+    @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> evts)
         throws GridException {
         ConcurrentMap<String, TestAverage> loc = ctx.localSpace();