You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:12 UTC

[11/32] incubator-ignite git commit: # Renaming

# Renaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/34694f39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/34694f39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/34694f39

Branch: refs/heads/master
Commit: 34694f39afff2b40cf01a84a7bb81a13b9d1341e
Parents: e9fe37f
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:43:39 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:43:39 2014 +0300

----------------------------------------------------------------------
 .../streaming/StreamingCheckInExample.java      |  10 +-
 .../StreamingPopularNumbersExample.java         |   4 +-
 .../streaming/StreamingPriceBarsExample.java    |   8 +-
 .../StreamingRunningAverageExample.java         |   4 +-
 .../java/org/apache/ignite/IgniteStreamer.java  |   6 +-
 .../streamer/GridStreamProcessor.java           |  14 +-
 .../streamer/GridStreamerAttributes.java        |   2 +-
 .../streamer/GridStreamerContextDelegate.java   |   4 +-
 .../streamer/GridStreamerContextImpl.java       |   6 +-
 .../streamer/GridStreamerStageMBeanAdapter.java |  93 ---------
 .../GridStreamerStageMetricsAdapter.java        | 127 ------------
 .../GridStreamerStageMetricsHolder.java         | 159 ---------------
 .../streamer/GridStreamerStageWrapper.java      |  84 --------
 .../GridStreamerWindowMetricsAdapter.java       |  57 ------
 .../GridStreamerWindowMetricsHolder.java        |  42 ----
 .../processors/streamer/IgniteStreamerEx.java   |   4 +-
 .../processors/streamer/IgniteStreamerImpl.java |  48 ++---
 .../streamer/StreamerMetricsAdapter.java        |  28 +--
 .../streamer/StreamerMetricsHolder.java         |  30 +--
 .../streamer/StreamerStageMBeanAdapter.java     |  93 +++++++++
 .../streamer/StreamerStageMetricsAdapter.java   | 127 ++++++++++++
 .../streamer/StreamerStageMetricsHolder.java    | 159 +++++++++++++++
 .../streamer/StreamerStageWrapper.java          |  84 ++++++++
 .../streamer/StreamerWindowMetricsAdapter.java  |  57 ++++++
 .../streamer/StreamerWindowMetricsHolder.java   |  42 ++++
 .../visor/streamer/VisorStreamerMetrics.java    |   2 +-
 .../streamer/VisorStreamerStageMetrics.java     |   6 +-
 .../grid/streamer/GridStreamerStage.java        |  53 -----
 .../grid/streamer/GridStreamerStageMBean.java   | 106 ----------
 .../grid/streamer/GridStreamerStageMetrics.java |  85 --------
 .../grid/streamer/GridStreamerWindow.java       | 199 -------------------
 .../grid/streamer/GridStreamerWindowMBean.java  |  50 -----
 .../streamer/GridStreamerWindowMetrics.java     |  36 ----
 .../grid/streamer/StreamerConfiguration.java    |  12 +-
 .../gridgain/grid/streamer/StreamerContext.java |   6 +-
 .../gridgain/grid/streamer/StreamerMetrics.java |   8 +-
 .../gridgain/grid/streamer/StreamerStage.java   |  53 +++++
 .../grid/streamer/StreamerStageMBean.java       | 106 ++++++++++
 .../grid/streamer/StreamerStageMetrics.java     |  85 ++++++++
 .../gridgain/grid/streamer/StreamerWindow.java  | 199 +++++++++++++++++++
 .../grid/streamer/StreamerWindowMBean.java      |  50 +++++
 .../grid/streamer/StreamerWindowMetrics.java    |  36 ++++
 .../grid/streamer/index/GridStreamerIndex.java  |   9 +-
 .../index/GridStreamerIndexProvider.java        |   3 +-
 .../window/GridStreamerWindowAdapter.java       |   4 +-
 .../streamer/GridStreamerEvictionSelfTest.java  |   8 +-
 .../streamer/GridStreamerFailoverSelfTest.java  |   4 +-
 .../GridStreamerLifecycleAwareSelfTest.java     |   8 +-
 .../streamer/GridStreamerSelfTest.java          |  24 +--
 .../processors/streamer/GridTestStage.java      |   2 +-
 .../marshaller/GridMarshallerAbstractTest.java  |   4 +-
 .../window/GridStreamerWindowSelfTest.java      |   6 +-
 .../loadtests/streamer/average/TestStage.java   |   4 +-
 53 files changed, 1229 insertions(+), 1231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
index c4a79df..f10dec1 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
@@ -165,7 +165,7 @@ public class StreamingCheckInExample {
                         new IgniteClosure<StreamerContext, Map<String, Place>>() {
                             @Override public Map<String, Place> apply(
                                 StreamerContext ctx) {
-                                GridStreamerWindow<LocationInfo> win =
+                                StreamerWindow<LocationInfo> win =
                                     ctx.window(DetectPlacesStage.class.getSimpleName());
 
                                 assert win != null;
@@ -416,7 +416,7 @@ public class StreamingCheckInExample {
      * with unique index to block repetitive check-ins.
      */
     @SuppressWarnings("PublicInnerClass")
-    public static class AddToWindowStage implements GridStreamerStage<CheckInEvent> {
+    public static class AddToWindowStage implements StreamerStage<CheckInEvent> {
         /** {@inheritDoc} */
         @Override public String name() {
             return getClass().getSimpleName();
@@ -425,7 +425,7 @@ public class StreamingCheckInExample {
         /** {@inheritDoc} */
         @Nullable @Override public Map<String, Collection<?>> run(
             StreamerContext ctx, Collection<CheckInEvent> evts) throws GridException {
-            GridStreamerWindow<CheckInEvent> win = ctx.window(name());
+            StreamerWindow<CheckInEvent> win = ctx.window(name());
 
             assert win != null;
 
@@ -464,7 +464,7 @@ public class StreamingCheckInExample {
      * Check-in event processing stage that detects the
      * check-in places.
      */
-    private static class DetectPlacesStage implements GridStreamerStage<CheckInEvent> {
+    private static class DetectPlacesStage implements StreamerStage<CheckInEvent> {
         /** {@inheritDoc} */
         @Override public String name() {
             return getClass().getSimpleName();
@@ -473,7 +473,7 @@ public class StreamingCheckInExample {
         /** {@inheritDoc} */
         @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx,
             Collection<CheckInEvent> evts) throws GridException {
-            GridStreamerWindow<LocationInfo> win = ctx.window(name());
+            StreamerWindow<LocationInfo> win = ctx.window(name());
 
             assert win != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
index 75208d8..8794ba9 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
@@ -193,7 +193,7 @@ public class StreamingPopularNumbersExample {
      * Sample streamer stage to compute average.
      */
     @SuppressWarnings("PublicInnerClass")
-    public static class StreamerStage implements GridStreamerStage<Integer> {
+    public static class StreamerStage implements org.gridgain.grid.streamer.StreamerStage<Integer> {
         /** {@inheritDoc} */
         @Override public String name() {
             return "exampleStage";
@@ -202,7 +202,7 @@ public class StreamingPopularNumbersExample {
         /** {@inheritDoc} */
         @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> nums)
             throws GridException {
-            GridStreamerWindow<Integer> win = ctx.window();
+            StreamerWindow<Integer> win = ctx.window();
 
             // Add numbers to window.
             win.enqueueAll(nums);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
index 918d783..ac64a93 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
@@ -364,7 +364,7 @@ public class StreamingPriceBarsExample {
      * The first stage where 1 second bars are built.
      */
     @SuppressWarnings({ "PublicInnerClass", "unchecked" })
-    public static class FirstStage implements GridStreamerStage<Quote> {
+    public static class FirstStage implements StreamerStage<Quote> {
         /** {@inheritDoc} */
         @Override public String name() {
             return getClass().getSimpleName();
@@ -373,7 +373,7 @@ public class StreamingPriceBarsExample {
         /** {@inheritDoc} */
         @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Quote> quotes)
             throws GridException {
-            GridStreamerWindow win = ctx.window("stage1");
+            StreamerWindow win = ctx.window("stage1");
 
             // Add numbers to window.
             win.enqueueAll(quotes);
@@ -405,7 +405,7 @@ public class StreamingPriceBarsExample {
      * The second stage where 2 second bars are built.
      */
     @SuppressWarnings({ "PublicInnerClass", "unchecked" })
-    public static class SecondStage implements GridStreamerStage<Bar> {
+    public static class SecondStage implements StreamerStage<Bar> {
         /** {@inheritDoc} */
         @Override public String name() {
             return getClass().getSimpleName();
@@ -416,7 +416,7 @@ public class StreamingPriceBarsExample {
             throws GridException {
             ConcurrentMap<String, Bar> loc = ctx.localSpace();
 
-            GridStreamerWindow win = ctx.window("stage2");
+            StreamerWindow win = ctx.window("stage2");
 
             // Add numbers to window.
             win.enqueueAll(bars);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
index b59aaf2..6a62456 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
@@ -141,7 +141,7 @@ public class StreamingRunningAverageExample {
     /**
      * Sample streamer stage to compute average.
      */
-    public static class StreamerStage implements GridStreamerStage<Integer> {
+    public static class StreamerStage implements org.gridgain.grid.streamer.StreamerStage<Integer> {
         /** {@inheritDoc} */
         @Override public String name() {
             return "exampleStage";
@@ -166,7 +166,7 @@ public class StreamingRunningAverageExample {
             for (Integer e : evts)
                 avg.add(e, 1);
 
-            GridStreamerWindow<Integer> win = ctx.window();
+            StreamerWindow<Integer> win = ctx.window();
 
             // Add input events to window.
             win.enqueueAll(evts);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
index 945cff2..89ce78d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
@@ -18,7 +18,7 @@ import java.util.*;
 /**
  * Streamer interface. Streamer provides an easy way to process large (possibly infinite) stream of
  * events. Event can be of any object type, different types of events can be submitted to streamer. Each event
- * is processed by one or more {@link org.gridgain.grid.streamer.GridStreamerStage}, a set of stages event passed through is called pipeline.
+ * is processed by one or more {@link org.gridgain.grid.streamer.StreamerStage}, a set of stages event passed through is called pipeline.
  * <p>
  * For each submitted group of events streamer determines one or more execution nodes that will process this
  * group of events. Execution nodes are determined by {@link org.gridgain.grid.streamer.StreamerEventRouter}. Execution nodes run stages
@@ -36,7 +36,7 @@ import java.util.*;
  * of failure and will try to execute pipeline from the beginning. If failover cannot be succeeded or maximum number
  * of failover attempts is exceeded, then listener will be notified on node which originated pipeline execution.
  *
- * @see org.gridgain.grid.streamer.GridStreamerStage
+ * @see org.gridgain.grid.streamer.StreamerStage
  * @see org.gridgain.grid.streamer.StreamerEventRouter
  */
 public interface IgniteStreamer {
@@ -124,7 +124,7 @@ public interface IgniteStreamer {
     public StreamerMetrics metrics();
 
     /**
-     * Resets all configured streamer windows by calling {@link GridStreamerWindow#reset()} on each and
+     * Resets all configured streamer windows by calling {@link org.gridgain.grid.streamer.StreamerWindow#reset()} on each and
      * clears local space.
      * <p>
      * This is local method, it will clear only local windows and local space. Note that windows and

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
index 8330a06..10cb8ac 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
@@ -76,11 +76,11 @@ public class GridStreamProcessor extends GridProcessorAdapter {
             }
 
             // Add mbeans for stages.
-            for (GridStreamerStage stage : s.configuration().getStages()) {
+            for (StreamerStage stage : s.configuration().getStages()) {
                 try {
                     mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), "Stage-" + stage.name(),
-                        new GridStreamerStageMBeanAdapter(stage.name(), stage.getClass().getName(), s),
-                        GridStreamerStageMBean.class));
+                        new StreamerStageMBeanAdapter(stage.name(), stage.getClass().getName(), s),
+                        StreamerStageMBean.class));
 
                     if (log.isDebugEnabled())
                         log.debug("Registered MBean for streamer stage [streamer=" + s.name() +
@@ -93,13 +93,13 @@ public class GridStreamProcessor extends GridProcessorAdapter {
             }
 
             // Add mbeans for windows.
-            for (GridStreamerWindow win : s.configuration().getWindows()) {
+            for (StreamerWindow win : s.configuration().getWindows()) {
                 try {
-                    if (hasInterface(win.getClass(), GridStreamerWindowMBean.class)) {
+                    if (hasInterface(win.getClass(), StreamerWindowMBean.class)) {
                         mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()),
                             "Window-" + win.name(),
-                            (GridStreamerWindowMBean)win,
-                            GridStreamerWindowMBean.class));
+                            (StreamerWindowMBean)win,
+                            StreamerWindowMBean.class));
 
                         if (log.isDebugEnabled())
                             log.debug("Registered MBean for streamer window [streamer=" + s.name() +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
index b7849e4..18b0701 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
@@ -59,7 +59,7 @@ public class GridStreamerAttributes implements Externalizable {
         stages = new LinkedList<>();
 
         if (!F.isEmpty(cfg.getStages())) {
-            for (GridStreamerStage stage : cfg.getStages())
+            for (StreamerStage stage : cfg.getStages())
                 stages.add(F.t(stage.name(), stage.getClass().getName()));
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
index fb8e1bf..064e3db 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
@@ -48,12 +48,12 @@ public class GridStreamerContextDelegate implements StreamerContext {
     }
 
     /** {@inheritDoc} */
-    @Override public <E> GridStreamerWindow<E> window() {
+    @Override public <E> StreamerWindow<E> window() {
         return delegate.window();
     }
 
     /** {@inheritDoc} */
-    @Override public <E> GridStreamerWindow<E> window(String winName) {
+    @Override public <E> StreamerWindow<E> window(String winName) {
         return delegate.window(winName);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
index c8320ab..e9c64ca 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
@@ -74,13 +74,13 @@ public class GridStreamerContextImpl implements StreamerContext {
     }
 
     /** {@inheritDoc} */
-    @Override public <E> GridStreamerWindow<E> window() {
+    @Override public <E> StreamerWindow<E> window() {
         return streamer.window();
     }
 
     /** {@inheritDoc} */
-    @Override public <E> GridStreamerWindow<E> window(String winName) {
-        GridStreamerWindow<E> window = streamer.window(winName);
+    @Override public <E> StreamerWindow<E> window(String winName) {
+        StreamerWindow<E> window = streamer.window(winName);
 
         if (window == null)
             throw new IllegalArgumentException("Streamer window is not configured: " + winName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java
deleted file mode 100644
index 1a8ba97..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-
-/**
- * Streamer stage MBean adapter.
- */
-@SuppressWarnings("ConstantConditions")
-public class GridStreamerStageMBeanAdapter implements GridStreamerStageMBean {
-    /** Stage name. */
-    private String stageName;
-
-    /** Stage class name. */
-    private String stageClsName;
-
-    /** */
-    private IgniteStreamerImpl streamer;
-
-    /**
-     * @param stageName Stage name.
-     * @param stageClsName Stage class name.
-     * @param streamer Streamer implementation.
-     */
-    public GridStreamerStageMBeanAdapter(String stageName, String stageClsName, IgniteStreamerImpl streamer) {
-        this.stageName = stageName;
-        this.stageClsName = stageClsName;
-        this.streamer = streamer;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getName() {
-        return stageName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getStageClassName() {
-        return stageClsName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getMinimumExecutionTime() {
-        return streamer.metrics().stageMetrics(stageName).minimumExecutionTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getMaximumExecutionTime() {
-        return streamer.metrics().stageMetrics(stageName).maximumExecutionTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getAverageExecutionTime() {
-        return streamer.metrics().stageMetrics(stageName).averageExecutionTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getMinimumWaitingTime() {
-        return streamer.metrics().stageMetrics(stageName).minimumWaitingTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getMaximumWaitingTime() {
-        return streamer.metrics().stageMetrics(stageName).maximumWaitingTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getAverageWaitingTime() {
-        return streamer.metrics().stageMetrics(stageName).averageWaitingTime();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getTotalExecutionCount() {
-        return streamer.metrics().stageMetrics(stageName).totalExecutionCount();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int getFailuresCount() {
-        return streamer.metrics().stageMetrics(stageName).failuresCount();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isExecuting() {
-        return streamer.metrics().stageMetrics(stageName).executing();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java
deleted file mode 100644
index e73ff4f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Streamer stage metrics adapter.
- */
-public class GridStreamerStageMetricsAdapter implements GridStreamerStageMetrics {
-    /** */
-    private String name;
-
-    /** */
-    private long minExecTime;
-
-    /** */
-    private long maxExecTime;
-
-    /** */
-    private long avgExecTime;
-
-    /** */
-    private long minWaitTime;
-
-    /** */
-    private long maxWaitTime;
-
-    /** */
-    private long avgWaitTime;
-
-    /** */
-    private long totalExecCnt;
-
-    /** */
-    private int failuresCnt;
-
-    /** */
-    private boolean executing;
-
-    /**
-     * Empty constructor.
-     */
-    public GridStreamerStageMetricsAdapter() {
-        // No-op.
-    }
-
-    /**
-     * @param metrics Metrics.
-     */
-    public GridStreamerStageMetricsAdapter(GridStreamerStageMetrics metrics) {
-        // Preserve alphabetic order for maintenance.
-        avgExecTime = metrics.averageExecutionTime();
-        avgWaitTime = metrics.averageWaitingTime();
-        executing = metrics.executing();
-        failuresCnt = metrics.failuresCount();
-        maxExecTime = metrics.maximumExecutionTime();
-        maxWaitTime = metrics.maximumWaitingTime();
-        minExecTime = metrics.minimumExecutionTime();
-        minWaitTime = metrics.minimumWaitingTime();
-        name = metrics.name();
-        totalExecCnt = metrics.totalExecutionCount();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long minimumExecutionTime() {
-        return minExecTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long maximumExecutionTime() {
-        return maxExecTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long averageExecutionTime() {
-        return avgExecTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long totalExecutionCount() {
-        return totalExecCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long minimumWaitingTime() {
-        return minWaitTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long maximumWaitingTime() {
-        return maxWaitTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long averageWaitingTime() {
-        return avgWaitTime;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int failuresCount() {
-        return failuresCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean executing() {
-        return executing;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerStageMetricsAdapter.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java
deleted file mode 100644
index e45830c..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.*;
-import org.jdk8.backport.*;
-
-/**
- * Streamer stage metrics holder.
- */
-public class GridStreamerStageMetricsHolder implements GridStreamerStageMetrics {
-    /** Stage name. */
-    private String name;
-
-    /** Minimum execution time. */
-    private GridAtomicLong minExecTime = new GridAtomicLong(Long.MAX_VALUE);
-
-    /** Maximum execution time. */
-    private GridAtomicLong maxExecTime = new GridAtomicLong();
-
-    /** Stage execution time sum. */
-    private LongAdder sumExecTime = new LongAdder();
-
-    /** Stage minimum waiting time. */
-    private GridAtomicLong minWaitTime = new GridAtomicLong(Long.MAX_VALUE);
-
-    /** Stage maximum waiting time. */
-    private GridAtomicLong maxWaitTime = new GridAtomicLong();
-
-    /** Stage average waiting time sum. */
-    private LongAdder sumWaitTime = new LongAdder();
-
-    /** Total number of times this stage was executed. */
-    private LongAdder totalExecCnt = new LongAdder();
-
-    /** Failures count. */
-    private LongAdder failuresCnt = new LongAdder();
-
-    /** Number of threads executing this stage. */
-    private LongAdder curActive = new LongAdder();
-
-    /**
-     * @param name Stage name.
-     */
-    public GridStreamerStageMetricsHolder(String name) {
-        this.name = name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long minimumExecutionTime() {
-        long min = minExecTime.get();
-
-        return min == Long.MAX_VALUE ? 0 : min;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long maximumExecutionTime() {
-        return maxExecTime.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long averageExecutionTime() {
-        long execTime = sumExecTime.sum();
-
-        long execs = totalExecCnt.sum();
-
-        return execs == 0 ? 0 : execTime / execs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long minimumWaitingTime() {
-        long min = minWaitTime.get();
-
-        return min == Long.MAX_VALUE ? 0 : min;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long maximumWaitingTime() {
-        return maxWaitTime.get();
-    }
-
-    /** {@inheritDoc} */
-    @Override public long averageWaitingTime() {
-        long waitTime = sumWaitTime.sum();
-
-        long execs = totalExecCnt.sum();
-
-        return execs == 0 ? 0 : waitTime / execs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long totalExecutionCount() {
-        return totalExecCnt.longValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int failuresCount() {
-        return failuresCnt.intValue();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean executing() {
-        return curActive.intValue() > 0;
-    }
-
-    /**
-     * Execution started callback.
-     *
-     * @param waitTime Wait time.
-     */
-    public void onExecutionStarted(long waitTime) {
-        if (waitTime < 0)
-            waitTime = 0;
-
-        curActive.increment();
-
-        maxWaitTime.setIfGreater(waitTime);
-        minWaitTime.setIfLess(waitTime);
-        sumWaitTime.add(waitTime);
-    }
-
-    /**
-     * Execution finished callback.
-     *
-     * @param execTime Stage execution time.
-     */
-    public void onExecutionFinished(long execTime) {
-        if (execTime < 0)
-            execTime = 0;
-
-        curActive.decrement();
-
-        maxExecTime.setIfGreater(execTime);
-        minExecTime.setIfLess(execTime);
-        sumExecTime.add(execTime);
-
-        totalExecCnt.increment();
-    }
-
-    /**
-     * Failure callback.
-     */
-    public void onFailure() {
-        failuresCnt.increment();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java
deleted file mode 100644
index 74b75ae..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Stage wrapper that handles metrics calculation and time measurement.
- */
-public class GridStreamerStageWrapper implements GridStreamerStage<Object> {
-    /** Stage delegate. */
-    private GridStreamerStage<Object> delegate;
-
-    /** Stage index. */
-    private int idx;
-
-    /** Next stage name. Set after creation. */
-    private String nextStageName;
-
-    /**
-     * @param delegate Delegate stage.
-     * @param idx Index.
-     */
-    public GridStreamerStageWrapper(GridStreamerStage<Object> delegate, int idx) {
-        this.delegate = delegate;
-        this.idx = idx;
-    }
-
-    /**
-     * @return Stage index.
-     */
-    public int index() {
-        return idx;
-    }
-
-    /**
-     * @return Next stage name in pipeline or {@code null} if this is the last stage.
-     */
-    @Nullable public String nextStageName() {
-        return nextStageName;
-    }
-
-    /**
-     * @param nextStageName Next stage name in pipeline.
-     */
-    public void nextStageName(String nextStageName) {
-        this.nextStageName = nextStageName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return delegate.name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts)
-        throws GridException {
-        return delegate.run(ctx, evts);
-    }
-
-    /**
-     * @return Delegate.
-     */
-    public GridStreamerStage unwrap() {
-        return delegate;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerStageWrapper.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java
deleted file mode 100644
index c11abdf..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Streamer window metrics adapter.
- */
-public class GridStreamerWindowMetricsAdapter implements GridStreamerWindowMetrics {
-    /** Window name. */
-    private String name;
-
-    /** Window size. */
-    private int size;
-
-    /** Window eviction queue size. */
-    private int evictionQueueSize;
-
-    /**
-     * @param m Metrics to copy.
-     */
-    public GridStreamerWindowMetricsAdapter(GridStreamerWindowMetrics m) {
-        // Preserve alphabetic order for maintenance.
-        evictionQueueSize = m.evictionQueueSize();
-        name = m.name();
-        size = m.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return size;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        return evictionQueueSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerWindowMetricsAdapter.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java
deleted file mode 100644
index 11ed8c6..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-
-/**
- * Streamer window metrics holder.
- */
-public class GridStreamerWindowMetricsHolder implements GridStreamerWindowMetrics {
-    /** Window instance. */
-    private GridStreamerWindow window;
-
-    /**
-     * @param window Streamer window.
-     */
-    public GridStreamerWindowMetricsHolder(GridStreamerWindow window) {
-        this.window = window;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return window.name();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return window.size();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int evictionQueueSize() {
-        return window.evictionQueueSize();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
index c4cd9dd..fc51a0e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
@@ -31,7 +31,7 @@ public interface IgniteStreamerEx extends IgniteStreamer {
      *
      * @return Streamer window.
      */
-    public <E> GridStreamerWindow<E> window();
+    public <E> StreamerWindow<E> window();
 
     /**
      * Gets streamer window by window name.
@@ -39,7 +39,7 @@ public interface IgniteStreamerEx extends IgniteStreamer {
      * @param windowName Window name.
      * @return Streamer window.
      */
-    @Nullable public <E> GridStreamerWindow<E> window(String windowName);
+    @Nullable public <E> StreamerWindow<E> window(String windowName);
 
     /**
      * Called before execution requests are sent to remote nodes or scheduled for local execution.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
index deb3e16..1098e84 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
@@ -78,14 +78,14 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
 
     /** Stages. */
     @GridToStringInclude
-    private Map<String, GridStreamerStageWrapper> stages;
+    private Map<String, StreamerStageWrapper> stages;
 
     /** Windows. */
     @GridToStringInclude
-    private Map<String, GridStreamerWindow> winMap;
+    private Map<String, StreamerWindow> winMap;
 
     /** Default streamer window. */
-    private GridStreamerWindow dfltWin;
+    private StreamerWindow dfltWin;
 
     /** */
     private String firstStage;
@@ -191,9 +191,9 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
 
         int stageIdx = 0;
 
-        GridStreamerStageWrapper prev = null;
+        StreamerStageWrapper prev = null;
 
-        for (GridStreamerStage s : c.getStages()) {
+        for (StreamerStage s : c.getStages()) {
             String sName = s.name();
 
             if (F.isEmpty(sName))
@@ -207,7 +207,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
             if (firstStage == null)
                 firstStage = sName;
 
-            GridStreamerStageWrapper wrapper = new GridStreamerStageWrapper(s, stageIdx);
+            StreamerStageWrapper wrapper = new StreamerStageWrapper(s, stageIdx);
 
             stages.put(sName, wrapper);
 
@@ -221,7 +221,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
 
         winMap = new LinkedHashMap<>();
 
-        for (GridStreamerWindow w : c.getWindows()) {
+        for (StreamerWindow w : c.getWindows()) {
             String wName = w.name();
 
             if (F.isEmpty(wName))
@@ -292,7 +292,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
      * @throws GridException If failed.
      */
     private void prepareResources() throws GridException {
-        for (GridStreamerStage s : c.getStages())
+        for (StreamerStage s : c.getStages())
             ctx.resource().injectGeneric(s);
 
         if (router == null)
@@ -300,7 +300,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
 
         ctx.resource().injectGeneric(router);
 
-        for (GridStreamerWindow w : c.getWindows())
+        for (StreamerWindow w : c.getWindows())
             ctx.resource().injectGeneric(w);
     }
 
@@ -359,7 +359,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
             }
         }
 
-        for (GridStreamerStageWrapper stage : stages.values()) {
+        for (StreamerStageWrapper stage : stages.values()) {
             try {
                 ctx.resource().cleanupGeneric(stage.unwrap());
             }
@@ -482,7 +482,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
         winLock.writeLock();
 
         try {
-            for (GridStreamerWindow win : winMap.values())
+            for (StreamerWindow win : winMap.values())
                 win.reset();
 
             streamerCtx.localSpace().clear();
@@ -494,22 +494,22 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
 
     /** {@inheritDoc} */
     @Override public void resetMetrics() {
-        GridStreamerStageMetricsHolder[] stageHolders = new GridStreamerStageMetricsHolder[c.getStages().size()];
+        StreamerStageMetricsHolder[] stageHolders = new StreamerStageMetricsHolder[c.getStages().size()];
 
         int idx = 0;
 
-        for (GridStreamerStage stage : c.getStages()) {
-            stageHolders[idx] = new GridStreamerStageMetricsHolder(stage.name());
+        for (StreamerStage stage : c.getStages()) {
+            stageHolders[idx] = new StreamerStageMetricsHolder(stage.name());
 
             idx++;
         }
 
-        GridStreamerWindowMetricsHolder[] windowHolders = new GridStreamerWindowMetricsHolder[c.getWindows().size()];
+        StreamerWindowMetricsHolder[] windowHolders = new StreamerWindowMetricsHolder[c.getWindows().size()];
 
         idx = 0;
 
-        for (GridStreamerWindow w : c.getWindows()) {
-            windowHolders[idx] = new GridStreamerWindowMetricsHolder(w);
+        for (StreamerWindow w : c.getWindows()) {
+            windowHolders[idx] = new StreamerWindowMetricsHolder(w);
 
             idx++;
         }
@@ -523,13 +523,13 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public <E> GridStreamerWindow<E> window() {
-        return (GridStreamerWindow<E>)dfltWin;
+    @Override public <E> StreamerWindow<E> window() {
+        return (StreamerWindow<E>)dfltWin;
     }
 
     /** {@inheritDoc} */
-    @Override public <E> GridStreamerWindow<E> window(String windowName) {
-        return (GridStreamerWindow<E>)winMap.get(windowName);
+    @Override public <E> StreamerWindow<E> window(String windowName) {
+        return (StreamerWindow<E>)winMap.get(windowName);
     }
 
     /**
@@ -712,7 +712,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
             if (log.isDebugEnabled())
                 log.debug("Scheduling local batch execution [futId=" + futId + ", stageName=" + batch.stageName() + ']');
 
-            GridStreamerStageWrapper wrapper = stages.get(batch.stageName());
+            StreamerStageWrapper wrapper = stages.get(batch.stageName());
 
             if (wrapper == null) {
                 completeParentStage(ctx.localNodeId(), batch.futureId(),
@@ -1226,7 +1226,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
         private GridStreamerExecutionBatch batch;
 
         /** */
-        private GridStreamerStageWrapper stageWrapper;
+        private StreamerStageWrapper stageWrapper;
 
         /** Streamer metrics holder. */
         private StreamerMetricsHolder streamerHolder;
@@ -1246,7 +1246,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
          */
         private BatchWorker(
             GridStreamerExecutionBatch batch,
-            GridStreamerStageWrapper stageWrapper,
+            StreamerStageWrapper stageWrapper,
             StreamerMetricsHolder streamerHolder
         ) {
             super(ctx.gridName(), "streamer-batch-worker-" + batch.stageName(), log);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
index 2eb99ee..445bf9a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
@@ -78,11 +78,11 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
 
     /** */
     @GridToStringInclude
-    private Map<String, GridStreamerStageMetrics> stageMetrics;
+    private Map<String, StreamerStageMetrics> stageMetrics;
 
     /** */
     @GridToStringInclude
-    private Map<String, GridStreamerWindowMetrics> windowMetrics;
+    private Map<String, StreamerWindowMetrics> windowMetrics;
 
     /**
      * Empty constructor.
@@ -117,17 +117,17 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
         stageWaitingExecCnt = metrics.stageWaitingExecutionCount();
 
         // Stage metrics.
-        Map<String, GridStreamerStageMetrics> map = U.newLinkedHashMap(metrics.stageMetrics().size());
+        Map<String, StreamerStageMetrics> map = U.newLinkedHashMap(metrics.stageMetrics().size());
 
-        for (GridStreamerStageMetrics m : metrics.stageMetrics())
-            map.put(m.name(), new GridStreamerStageMetricsAdapter(m));
+        for (StreamerStageMetrics m : metrics.stageMetrics())
+            map.put(m.name(), new StreamerStageMetricsAdapter(m));
 
         stageMetrics = Collections.unmodifiableMap(map);
 
-        Map<String, GridStreamerWindowMetrics> map0 = U.newLinkedHashMap(metrics.windowMetrics().size());
+        Map<String, StreamerWindowMetrics> map0 = U.newLinkedHashMap(metrics.windowMetrics().size());
 
-        for (GridStreamerWindowMetrics m : metrics.windowMetrics())
-            map0.put(m.name(), new GridStreamerWindowMetricsAdapter(m));
+        for (StreamerWindowMetrics m : metrics.windowMetrics())
+            map0.put(m.name(), new StreamerWindowMetricsAdapter(m));
 
         windowMetrics = Collections.unmodifiableMap(map0);
     }
@@ -228,8 +228,8 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
     }
 
     /** {@inheritDoc} */
-    @Override public GridStreamerStageMetrics stageMetrics(String stageName) {
-        GridStreamerStageMetrics metrics = stageMetrics.get(stageName);
+    @Override public StreamerStageMetrics stageMetrics(String stageName) {
+        StreamerStageMetrics metrics = stageMetrics.get(stageName);
 
         if (metrics == null)
             throw new IllegalArgumentException("Streamer stage is not configured: " + stageName);
@@ -238,13 +238,13 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridStreamerStageMetrics> stageMetrics() {
+    @Override public Collection<StreamerStageMetrics> stageMetrics() {
         return stageMetrics.values();
     }
 
     /** {@inheritDoc} */
-    @Override public GridStreamerWindowMetrics windowMetrics(String winName) {
-        GridStreamerWindowMetrics metrics = windowMetrics.get(winName);
+    @Override public StreamerWindowMetrics windowMetrics(String winName) {
+        StreamerWindowMetrics metrics = windowMetrics.get(winName);
 
         if (metrics == null)
             throw new IllegalArgumentException("Streamer window is not configured: " + winName);
@@ -253,7 +253,7 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridStreamerWindowMetrics> windowMetrics() {
+    @Override public Collection<StreamerWindowMetrics> windowMetrics() {
         return windowMetrics.values();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
index eed6408..3ad6b2b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
@@ -89,13 +89,13 @@ public class StreamerMetricsHolder implements StreamerMetrics {
     private LongAdder failuresCnt = new LongAdder();
 
     /** Stages metrics. */
-    private final GridStreamerStageMetricsHolder[] stageMetrics;
+    private final StreamerStageMetricsHolder[] stageMetrics;
 
     /** Stage metrics map. */
-    private final Map<String, GridStreamerStageMetrics> stageMetricsMap;
+    private final Map<String, StreamerStageMetrics> stageMetricsMap;
 
     /** Window metrics map. */
-    private final Map<String, GridStreamerWindowMetrics> windowMetricsMap;
+    private final Map<String, StreamerWindowMetrics> windowMetricsMap;
 
     /** Executor service capacity. */
     private final int execSvcCap;
@@ -106,23 +106,23 @@ public class StreamerMetricsHolder implements StreamerMetrics {
      * @param execSvcCap Executor service capacity.
      */
     public StreamerMetricsHolder(
-        GridStreamerStageMetricsHolder[] stageMetrics,
-        GridStreamerWindowMetricsHolder[] windowMetrics,
+        StreamerStageMetricsHolder[] stageMetrics,
+        StreamerWindowMetricsHolder[] windowMetrics,
         int execSvcCap
     ) {
         this.execSvcCap = execSvcCap;
         this.stageMetrics = stageMetrics;
 
-        Map<String, GridStreamerStageMetrics> map = new LinkedHashMap<>();
+        Map<String, StreamerStageMetrics> map = new LinkedHashMap<>();
 
-        for (GridStreamerStageMetricsHolder holder : stageMetrics)
+        for (StreamerStageMetricsHolder holder : stageMetrics)
             map.put(holder.name(), holder);
 
         stageMetricsMap = Collections.unmodifiableMap(map);
 
-        Map<String, GridStreamerWindowMetrics> map0 = new LinkedHashMap<>();
+        Map<String, StreamerWindowMetrics> map0 = new LinkedHashMap<>();
 
-        for (GridStreamerWindowMetricsHolder holder : windowMetrics)
+        for (StreamerWindowMetricsHolder holder : windowMetrics)
             map0.put(holder.name(), holder);
 
         windowMetricsMap = Collections.unmodifiableMap(map0);
@@ -248,8 +248,8 @@ public class StreamerMetricsHolder implements StreamerMetrics {
     }
 
     /** {@inheritDoc} */
-    @Override public GridStreamerStageMetrics stageMetrics(String stageName) {
-        GridStreamerStageMetrics metrics = stageMetricsMap.get(stageName);
+    @Override public StreamerStageMetrics stageMetrics(String stageName) {
+        StreamerStageMetrics metrics = stageMetricsMap.get(stageName);
 
         if (metrics == null)
             throw new IllegalArgumentException("Streamer stage is not configured: " + stageName);
@@ -258,13 +258,13 @@ public class StreamerMetricsHolder implements StreamerMetrics {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridStreamerStageMetrics> stageMetrics() {
+    @Override public Collection<StreamerStageMetrics> stageMetrics() {
         return stageMetricsMap.values();
     }
 
     /** {@inheritDoc} */
-    @Override public GridStreamerWindowMetrics windowMetrics(String winName) {
-        GridStreamerWindowMetrics metrics = windowMetricsMap.get(winName);
+    @Override public StreamerWindowMetrics windowMetrics(String winName) {
+        StreamerWindowMetrics metrics = windowMetricsMap.get(winName);
 
         if (metrics == null)
             throw new IllegalArgumentException("Streamer window is not configured: " + winName);
@@ -273,7 +273,7 @@ public class StreamerMetricsHolder implements StreamerMetrics {
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<GridStreamerWindowMetrics> windowMetrics() {
+    @Override public Collection<StreamerWindowMetrics> windowMetrics() {
         return windowMetricsMap.values();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java
new file mode 100644
index 0000000..a6101f9
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java
@@ -0,0 +1,93 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+
+/**
+ * Streamer stage MBean adapter.
+ */
+@SuppressWarnings("ConstantConditions")
+public class StreamerStageMBeanAdapter implements StreamerStageMBean {
+    /** Stage name. */
+    private String stageName;
+
+    /** Stage class name. */
+    private String stageClsName;
+
+    /** */
+    private IgniteStreamerImpl streamer;
+
+    /**
+     * @param stageName Stage name.
+     * @param stageClsName Stage class name.
+     * @param streamer Streamer implementation.
+     */
+    public StreamerStageMBeanAdapter(String stageName, String stageClsName, IgniteStreamerImpl streamer) {
+        this.stageName = stageName;
+        this.stageClsName = stageClsName;
+        this.streamer = streamer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return stageName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStageClassName() {
+        return stageClsName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMinimumExecutionTime() {
+        return streamer.metrics().stageMetrics(stageName).minimumExecutionTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMaximumExecutionTime() {
+        return streamer.metrics().stageMetrics(stageName).maximumExecutionTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getAverageExecutionTime() {
+        return streamer.metrics().stageMetrics(stageName).averageExecutionTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMinimumWaitingTime() {
+        return streamer.metrics().stageMetrics(stageName).minimumWaitingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMaximumWaitingTime() {
+        return streamer.metrics().stageMetrics(stageName).maximumWaitingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getAverageWaitingTime() {
+        return streamer.metrics().stageMetrics(stageName).averageWaitingTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getTotalExecutionCount() {
+        return streamer.metrics().stageMetrics(stageName).totalExecutionCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getFailuresCount() {
+        return streamer.metrics().stageMetrics(stageName).failuresCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isExecuting() {
+        return streamer.metrics().stageMetrics(stageName).executing();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java
new file mode 100644
index 0000000..afa7543
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java
@@ -0,0 +1,127 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Streamer stage metrics adapter.
+ */
+public class StreamerStageMetricsAdapter implements StreamerStageMetrics {
+    /** */
+    private String name;
+
+    /** */
+    private long minExecTime;
+
+    /** */
+    private long maxExecTime;
+
+    /** */
+    private long avgExecTime;
+
+    /** */
+    private long minWaitTime;
+
+    /** */
+    private long maxWaitTime;
+
+    /** */
+    private long avgWaitTime;
+
+    /** */
+    private long totalExecCnt;
+
+    /** */
+    private int failuresCnt;
+
+    /** */
+    private boolean executing;
+
+    /**
+     * Empty constructor.
+     */
+    public StreamerStageMetricsAdapter() {
+        // No-op.
+    }
+
+    /**
+     * @param metrics Metrics.
+     */
+    public StreamerStageMetricsAdapter(StreamerStageMetrics metrics) {
+        // Preserve alphabetic order for maintenance.
+        avgExecTime = metrics.averageExecutionTime();
+        avgWaitTime = metrics.averageWaitingTime();
+        executing = metrics.executing();
+        failuresCnt = metrics.failuresCount();
+        maxExecTime = metrics.maximumExecutionTime();
+        maxWaitTime = metrics.maximumWaitingTime();
+        minExecTime = metrics.minimumExecutionTime();
+        minWaitTime = metrics.minimumWaitingTime();
+        name = metrics.name();
+        totalExecCnt = metrics.totalExecutionCount();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long minimumExecutionTime() {
+        return minExecTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long maximumExecutionTime() {
+        return maxExecTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long averageExecutionTime() {
+        return avgExecTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long totalExecutionCount() {
+        return totalExecCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long minimumWaitingTime() {
+        return minWaitTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long maximumWaitingTime() {
+        return maxWaitTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long averageWaitingTime() {
+        return avgWaitTime;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int failuresCount() {
+        return failuresCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean executing() {
+        return executing;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StreamerStageMetricsAdapter.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java
new file mode 100644
index 0000000..cb76166
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java
@@ -0,0 +1,159 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.*;
+import org.jdk8.backport.*;
+
+/**
+ * Streamer stage metrics holder.
+ */
+public class StreamerStageMetricsHolder implements StreamerStageMetrics {
+    /** Stage name. */
+    private String name;
+
+    /** Minimum execution time. */
+    private GridAtomicLong minExecTime = new GridAtomicLong(Long.MAX_VALUE);
+
+    /** Maximum execution time. */
+    private GridAtomicLong maxExecTime = new GridAtomicLong();
+
+    /** Stage execution time sum. */
+    private LongAdder sumExecTime = new LongAdder();
+
+    /** Stage minimum waiting time. */
+    private GridAtomicLong minWaitTime = new GridAtomicLong(Long.MAX_VALUE);
+
+    /** Stage maximum waiting time. */
+    private GridAtomicLong maxWaitTime = new GridAtomicLong();
+
+    /** Stage average waiting time sum. */
+    private LongAdder sumWaitTime = new LongAdder();
+
+    /** Total number of times this stage was executed. */
+    private LongAdder totalExecCnt = new LongAdder();
+
+    /** Failures count. */
+    private LongAdder failuresCnt = new LongAdder();
+
+    /** Number of threads executing this stage. */
+    private LongAdder curActive = new LongAdder();
+
+    /**
+     * @param name Stage name.
+     */
+    public StreamerStageMetricsHolder(String name) {
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long minimumExecutionTime() {
+        long min = minExecTime.get();
+
+        return min == Long.MAX_VALUE ? 0 : min;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long maximumExecutionTime() {
+        return maxExecTime.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long averageExecutionTime() {
+        long execTime = sumExecTime.sum();
+
+        long execs = totalExecCnt.sum();
+
+        return execs == 0 ? 0 : execTime / execs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long minimumWaitingTime() {
+        long min = minWaitTime.get();
+
+        return min == Long.MAX_VALUE ? 0 : min;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long maximumWaitingTime() {
+        return maxWaitTime.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long averageWaitingTime() {
+        long waitTime = sumWaitTime.sum();
+
+        long execs = totalExecCnt.sum();
+
+        return execs == 0 ? 0 : waitTime / execs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long totalExecutionCount() {
+        return totalExecCnt.longValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int failuresCount() {
+        return failuresCnt.intValue();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean executing() {
+        return curActive.intValue() > 0;
+    }
+
+    /**
+     * Execution started callback.
+     *
+     * @param waitTime Wait time.
+     */
+    public void onExecutionStarted(long waitTime) {
+        if (waitTime < 0)
+            waitTime = 0;
+
+        curActive.increment();
+
+        maxWaitTime.setIfGreater(waitTime);
+        minWaitTime.setIfLess(waitTime);
+        sumWaitTime.add(waitTime);
+    }
+
+    /**
+     * Execution finished callback.
+     *
+     * @param execTime Stage execution time.
+     */
+    public void onExecutionFinished(long execTime) {
+        if (execTime < 0)
+            execTime = 0;
+
+        curActive.decrement();
+
+        maxExecTime.setIfGreater(execTime);
+        minExecTime.setIfLess(execTime);
+        sumExecTime.add(execTime);
+
+        totalExecCnt.increment();
+    }
+
+    /**
+     * Failure callback.
+     */
+    public void onFailure() {
+        failuresCnt.increment();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java
new file mode 100644
index 0000000..4f55a78
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java
@@ -0,0 +1,84 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Stage wrapper that handles metrics calculation and time measurement.
+ */
+public class StreamerStageWrapper implements StreamerStage<Object> {
+    /** Stage delegate. */
+    private StreamerStage<Object> delegate;
+
+    /** Stage index. */
+    private int idx;
+
+    /** Next stage name. Set after creation. */
+    private String nextStageName;
+
+    /**
+     * @param delegate Delegate stage.
+     * @param idx Index.
+     */
+    public StreamerStageWrapper(StreamerStage<Object> delegate, int idx) {
+        this.delegate = delegate;
+        this.idx = idx;
+    }
+
+    /**
+     * @return Stage index.
+     */
+    public int index() {
+        return idx;
+    }
+
+    /**
+     * @return Next stage name in pipeline or {@code null} if this is the last stage.
+     */
+    @Nullable public String nextStageName() {
+        return nextStageName;
+    }
+
+    /**
+     * @param nextStageName Next stage name in pipeline.
+     */
+    public void nextStageName(String nextStageName) {
+        this.nextStageName = nextStageName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return delegate.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts)
+        throws GridException {
+        return delegate.run(ctx, evts);
+    }
+
+    /**
+     * @return Delegate.
+     */
+    public StreamerStage unwrap() {
+        return delegate;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StreamerStageWrapper.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java
new file mode 100644
index 0000000..b181fa7
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java
@@ -0,0 +1,57 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Streamer window metrics adapter.
+ */
+public class StreamerWindowMetricsAdapter implements StreamerWindowMetrics {
+    /** Window name. */
+    private String name;
+
+    /** Window size. */
+    private int size;
+
+    /** Window eviction queue size. */
+    private int evictionQueueSize;
+
+    /**
+     * @param m Metrics to copy.
+     */
+    public StreamerWindowMetricsAdapter(StreamerWindowMetrics m) {
+        // Preserve alphabetic order for maintenance.
+        evictionQueueSize = m.evictionQueueSize();
+        name = m.name();
+        size = m.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int evictionQueueSize() {
+        return evictionQueueSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StreamerWindowMetricsAdapter.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java
new file mode 100644
index 0000000..e7a1e02
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java
@@ -0,0 +1,42 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+
+/**
+ * Streamer window metrics holder.
+ */
+public class StreamerWindowMetricsHolder implements StreamerWindowMetrics {
+    /** Window instance. */
+    private StreamerWindow window;
+
+    /**
+     * @param window Streamer window.
+     */
+    public StreamerWindowMetricsHolder(StreamerWindow window) {
+        this.window = window;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return window.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return window.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int evictionQueueSize() {
+        return window.evictionQueueSize();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
index 08b6efe..1717333 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
@@ -81,7 +81,7 @@ public class VisorStreamerMetrics implements Serializable {
 
         int windowSz = 0;
 
-        for (GridStreamerWindowMetrics wm : m.windowMetrics())
+        for (StreamerWindowMetrics wm : m.windowMetrics())
             windowSz += wm.size();
 
         VisorStreamerMetrics metrics = new VisorStreamerMetrics();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
index 0462476..ade0027 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
@@ -17,7 +17,7 @@ import java.io.*;
 import java.util.*;
 
 /**
- * Data transfer object for {@link GridStreamerStageMetrics}.
+ * Data transfer object for {@link org.gridgain.grid.streamer.StreamerStageMetrics}.
  */
 public class VisorStreamerStageMetrics implements Serializable {
     /** */
@@ -60,7 +60,7 @@ public class VisorStreamerStageMetrics implements Serializable {
     private int failuresFreq = -1;
 
     /** Create data transfer object for given metrics. */
-    public static VisorStreamerStageMetrics from(GridStreamerStageMetrics m) {
+    public static VisorStreamerStageMetrics from(StreamerStageMetrics m) {
         assert m != null;
 
         VisorStreamerStageMetrics metrics = new VisorStreamerStageMetrics();
@@ -88,7 +88,7 @@ public class VisorStreamerStageMetrics implements Serializable {
 
         Collection<VisorStreamerStageMetrics> res = new ArrayList<>();
 
-        for (GridStreamerStageMetrics m : streamer.metrics().stageMetrics())
+        for (StreamerStageMetrics m : streamer.metrics().stageMetrics())
             res.add(from(m));
 
         return res;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
deleted file mode 100644
index b994f4e..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.gridgain.grid.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer stage is a component that determines event processing flow. User logic related to
- * any particular event processing is implemented by streamer stage. A stage takes events as
- * an input and returns groups of events mapped to different stages as an output. Events for
- * every returned stage will be passed to {@link StreamerEventRouter} which will determine
- * on which node the stage should be executed.
- * <p>
- * Generally, event stage execution graph if fully controlled by return values of
- * this method, while node execution graph is controlled by
- * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
- */
-public interface GridStreamerStage<IN> {
-    /**
-     * Gets streamer stage name.
-     *
-     * @return Name of the stage.
-     */
-    public String name();
-
-    /**
-     * Stage execution routine. After the passed in events are processed, stage can emit
-     * another set of events to be processed. The returned events can be mapped to different
-     * stages. Events for every returned stage will be passed to {@link StreamerEventRouter}
-     * which will determine on which node the stage should be executed.
-     * <p>
-     * Generally, event stage execution graph if fully controlled by return values of
-     * this method, while node execution graph is controlled by
-     * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
-     *
-     * @param ctx Streamer context.
-     * @param evts Input events.
-     * @return Map of stage name to collection of events.
-     * @throws GridException If failed.
-     */
-    @Nullable public Map<String, Collection<?>> run(StreamerContext ctx, Collection<IN> evts)
-        throws GridException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java
deleted file mode 100644
index 7505d2f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.apache.ignite.mbean.*;
-
-/**
- * Streamer stage MBean.
- */
-@IgniteMBeanDescription("MBean that provides access to streamer stage description and metrics.")
-public interface GridStreamerStageMBean {
-    /**
-     * Gets stage name.
-     *
-     * @return Stage name.
-     */
-    @IgniteMBeanDescription("Stage name.")
-    public String getName();
-
-    /**
-     * Gets stage class name.
-     *
-     * @return Stage class name.
-     */
-    @IgniteMBeanDescription("Stage class name.")
-    public String getStageClassName();
-
-    /**
-     * Gets stage minimum execution time.
-     *
-     * @return Stage minimum execution time.
-     */
-    @IgniteMBeanDescription("Stage minimum execution time.")
-    public long getMinimumExecutionTime();
-
-    /**
-     * Gets stage maximum execution time.
-     *
-     * @return Stage maximum execution time.
-     */
-    @IgniteMBeanDescription("Stage maximum execution time.")
-    public long getMaximumExecutionTime();
-
-    /**
-     * Gets stage average execution time.
-     *
-     * @return Stage average execution time.
-     */
-    @IgniteMBeanDescription("Stage average execution time.")
-    public long getAverageExecutionTime();
-
-    /**
-     * Gets stage minimum waiting time.
-     *
-     * @return Stage minimum waiting time.
-     */
-    @IgniteMBeanDescription("Stage minimum waiting time.")
-    public long getMinimumWaitingTime();
-
-    /**
-     * Gets stage maximum waiting time.
-     *
-     * @return Stage maximum waiting time.
-     */
-    @IgniteMBeanDescription("Stage maximum waiting time.")
-    public long getMaximumWaitingTime();
-
-    /**
-     * Stage average waiting time.
-     *
-     * @return Stage average waiting time.
-     */
-    @IgniteMBeanDescription("Stage average waiting time.")
-    public long getAverageWaitingTime();
-
-    /**
-     * Gets total stage execution count since last reset.
-     *
-     * @return Number of times this stage was executed.
-     */
-    @IgniteMBeanDescription("Number of times this stage was executed.")
-    public long getTotalExecutionCount();
-
-    /**
-     * Gets stage failure count.
-     *
-     * @return Stage failure count.
-     */
-    @IgniteMBeanDescription("Stage failure count.")
-    public int getFailuresCount();
-
-    /**
-     * Gets flag indicating if stage is being currently executed by at least one thread on current node.
-     *
-     * @return {@code True} if stage is executing now.
-     */
-    @IgniteMBeanDescription("Whether stage is currently being executed.")
-    public boolean isExecuting();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java
deleted file mode 100644
index f0beeb5..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-/**
- * Streamer stage metrics.
- */
-public interface GridStreamerStageMetrics {
-    /**
-     * Gets stage name.
-     *
-     * @return Stage name.
-     */
-    public String name();
-
-    /**
-     * Gets stage minimum execution time.
-     *
-     * @return Stage minimum execution time.
-     */
-    public long minimumExecutionTime();
-
-    /**
-     * Gets stage maximum execution time.
-     *
-     * @return Stage maximum execution time.
-     */
-    public long maximumExecutionTime();
-
-    /**
-     * Gets stage average execution time.
-     *
-     * @return Stage average execution time.
-     */
-    public long averageExecutionTime();
-
-    /**
-     * Gets stage minimum waiting time.
-     *
-     * @return Stage minimum waiting time.
-     */
-    public long minimumWaitingTime();
-
-    /**
-     * Gets stage maximum waiting time.
-     *
-     * @return Stage maximum waiting time.
-     */
-    public long maximumWaitingTime();
-
-    /**
-     * Stage average waiting time.
-     *
-     * @return Stage average waiting time.
-     */
-    public long averageWaitingTime();
-
-    /**
-     * Gets total stage execution count since last reset.
-     *
-     * @return Number of times this stage was executed.
-     */
-    public long totalExecutionCount();
-
-    /**
-     * Gets stage failure count.
-     *
-     * @return Stage failure count.
-     */
-    public int failuresCount();
-
-    /**
-     * Gets flag indicating if stage is being currently executed by at least one thread on current node.
-     *
-     * @return {@code True} if stage is executing now.
-     */
-    public boolean executing();
-}