You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:12 UTC
[11/32] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/34694f39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/34694f39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/34694f39
Branch: refs/heads/master
Commit: 34694f39afff2b40cf01a84a7bb81a13b9d1341e
Parents: e9fe37f
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:43:39 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:43:39 2014 +0300
----------------------------------------------------------------------
.../streaming/StreamingCheckInExample.java | 10 +-
.../StreamingPopularNumbersExample.java | 4 +-
.../streaming/StreamingPriceBarsExample.java | 8 +-
.../StreamingRunningAverageExample.java | 4 +-
.../java/org/apache/ignite/IgniteStreamer.java | 6 +-
.../streamer/GridStreamProcessor.java | 14 +-
.../streamer/GridStreamerAttributes.java | 2 +-
.../streamer/GridStreamerContextDelegate.java | 4 +-
.../streamer/GridStreamerContextImpl.java | 6 +-
.../streamer/GridStreamerStageMBeanAdapter.java | 93 ---------
.../GridStreamerStageMetricsAdapter.java | 127 ------------
.../GridStreamerStageMetricsHolder.java | 159 ---------------
.../streamer/GridStreamerStageWrapper.java | 84 --------
.../GridStreamerWindowMetricsAdapter.java | 57 ------
.../GridStreamerWindowMetricsHolder.java | 42 ----
.../processors/streamer/IgniteStreamerEx.java | 4 +-
.../processors/streamer/IgniteStreamerImpl.java | 48 ++---
.../streamer/StreamerMetricsAdapter.java | 28 +--
.../streamer/StreamerMetricsHolder.java | 30 +--
.../streamer/StreamerStageMBeanAdapter.java | 93 +++++++++
.../streamer/StreamerStageMetricsAdapter.java | 127 ++++++++++++
.../streamer/StreamerStageMetricsHolder.java | 159 +++++++++++++++
.../streamer/StreamerStageWrapper.java | 84 ++++++++
.../streamer/StreamerWindowMetricsAdapter.java | 57 ++++++
.../streamer/StreamerWindowMetricsHolder.java | 42 ++++
.../visor/streamer/VisorStreamerMetrics.java | 2 +-
.../streamer/VisorStreamerStageMetrics.java | 6 +-
.../grid/streamer/GridStreamerStage.java | 53 -----
.../grid/streamer/GridStreamerStageMBean.java | 106 ----------
.../grid/streamer/GridStreamerStageMetrics.java | 85 --------
.../grid/streamer/GridStreamerWindow.java | 199 -------------------
.../grid/streamer/GridStreamerWindowMBean.java | 50 -----
.../streamer/GridStreamerWindowMetrics.java | 36 ----
.../grid/streamer/StreamerConfiguration.java | 12 +-
.../gridgain/grid/streamer/StreamerContext.java | 6 +-
.../gridgain/grid/streamer/StreamerMetrics.java | 8 +-
.../gridgain/grid/streamer/StreamerStage.java | 53 +++++
.../grid/streamer/StreamerStageMBean.java | 106 ++++++++++
.../grid/streamer/StreamerStageMetrics.java | 85 ++++++++
.../gridgain/grid/streamer/StreamerWindow.java | 199 +++++++++++++++++++
.../grid/streamer/StreamerWindowMBean.java | 50 +++++
.../grid/streamer/StreamerWindowMetrics.java | 36 ++++
.../grid/streamer/index/GridStreamerIndex.java | 9 +-
.../index/GridStreamerIndexProvider.java | 3 +-
.../window/GridStreamerWindowAdapter.java | 4 +-
.../streamer/GridStreamerEvictionSelfTest.java | 8 +-
.../streamer/GridStreamerFailoverSelfTest.java | 4 +-
.../GridStreamerLifecycleAwareSelfTest.java | 8 +-
.../streamer/GridStreamerSelfTest.java | 24 +--
.../processors/streamer/GridTestStage.java | 2 +-
.../marshaller/GridMarshallerAbstractTest.java | 4 +-
.../window/GridStreamerWindowSelfTest.java | 6 +-
.../loadtests/streamer/average/TestStage.java | 4 +-
53 files changed, 1229 insertions(+), 1231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
index c4a79df..f10dec1 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
@@ -165,7 +165,7 @@ public class StreamingCheckInExample {
new IgniteClosure<StreamerContext, Map<String, Place>>() {
@Override public Map<String, Place> apply(
StreamerContext ctx) {
- GridStreamerWindow<LocationInfo> win =
+ StreamerWindow<LocationInfo> win =
ctx.window(DetectPlacesStage.class.getSimpleName());
assert win != null;
@@ -416,7 +416,7 @@ public class StreamingCheckInExample {
* with unique index to block repetitive check-ins.
*/
@SuppressWarnings("PublicInnerClass")
- public static class AddToWindowStage implements GridStreamerStage<CheckInEvent> {
+ public static class AddToWindowStage implements StreamerStage<CheckInEvent> {
/** {@inheritDoc} */
@Override public String name() {
return getClass().getSimpleName();
@@ -425,7 +425,7 @@ public class StreamingCheckInExample {
/** {@inheritDoc} */
@Nullable @Override public Map<String, Collection<?>> run(
StreamerContext ctx, Collection<CheckInEvent> evts) throws GridException {
- GridStreamerWindow<CheckInEvent> win = ctx.window(name());
+ StreamerWindow<CheckInEvent> win = ctx.window(name());
assert win != null;
@@ -464,7 +464,7 @@ public class StreamingCheckInExample {
* Check-in event processing stage that detects the
* check-in places.
*/
- private static class DetectPlacesStage implements GridStreamerStage<CheckInEvent> {
+ private static class DetectPlacesStage implements StreamerStage<CheckInEvent> {
/** {@inheritDoc} */
@Override public String name() {
return getClass().getSimpleName();
@@ -473,7 +473,7 @@ public class StreamingCheckInExample {
/** {@inheritDoc} */
@Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx,
Collection<CheckInEvent> evts) throws GridException {
- GridStreamerWindow<LocationInfo> win = ctx.window(name());
+ StreamerWindow<LocationInfo> win = ctx.window(name());
assert win != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
index 75208d8..8794ba9 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
@@ -193,7 +193,7 @@ public class StreamingPopularNumbersExample {
* Sample streamer stage to compute average.
*/
@SuppressWarnings("PublicInnerClass")
- public static class StreamerStage implements GridStreamerStage<Integer> {
+ public static class StreamerStage implements org.gridgain.grid.streamer.StreamerStage<Integer> {
/** {@inheritDoc} */
@Override public String name() {
return "exampleStage";
@@ -202,7 +202,7 @@ public class StreamingPopularNumbersExample {
/** {@inheritDoc} */
@Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> nums)
throws GridException {
- GridStreamerWindow<Integer> win = ctx.window();
+ StreamerWindow<Integer> win = ctx.window();
// Add numbers to window.
win.enqueueAll(nums);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
index 918d783..ac64a93 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPriceBarsExample.java
@@ -364,7 +364,7 @@ public class StreamingPriceBarsExample {
* The first stage where 1 second bars are built.
*/
@SuppressWarnings({ "PublicInnerClass", "unchecked" })
- public static class FirstStage implements GridStreamerStage<Quote> {
+ public static class FirstStage implements StreamerStage<Quote> {
/** {@inheritDoc} */
@Override public String name() {
return getClass().getSimpleName();
@@ -373,7 +373,7 @@ public class StreamingPriceBarsExample {
/** {@inheritDoc} */
@Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Quote> quotes)
throws GridException {
- GridStreamerWindow win = ctx.window("stage1");
+ StreamerWindow win = ctx.window("stage1");
// Add numbers to window.
win.enqueueAll(quotes);
@@ -405,7 +405,7 @@ public class StreamingPriceBarsExample {
* The second stage where 2 second bars are built.
*/
@SuppressWarnings({ "PublicInnerClass", "unchecked" })
- public static class SecondStage implements GridStreamerStage<Bar> {
+ public static class SecondStage implements StreamerStage<Bar> {
/** {@inheritDoc} */
@Override public String name() {
return getClass().getSimpleName();
@@ -416,7 +416,7 @@ public class StreamingPriceBarsExample {
throws GridException {
ConcurrentMap<String, Bar> loc = ctx.localSpace();
- GridStreamerWindow win = ctx.window("stage2");
+ StreamerWindow win = ctx.window("stage2");
// Add numbers to window.
win.enqueueAll(bars);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
index b59aaf2..6a62456 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingRunningAverageExample.java
@@ -141,7 +141,7 @@ public class StreamingRunningAverageExample {
/**
* Sample streamer stage to compute average.
*/
- public static class StreamerStage implements GridStreamerStage<Integer> {
+ public static class StreamerStage implements org.gridgain.grid.streamer.StreamerStage<Integer> {
/** {@inheritDoc} */
@Override public String name() {
return "exampleStage";
@@ -166,7 +166,7 @@ public class StreamingRunningAverageExample {
for (Integer e : evts)
avg.add(e, 1);
- GridStreamerWindow<Integer> win = ctx.window();
+ StreamerWindow<Integer> win = ctx.window();
// Add input events to window.
win.enqueueAll(evts);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
index 945cff2..89ce78d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteStreamer.java
@@ -18,7 +18,7 @@ import java.util.*;
/**
* Streamer interface. Streamer provides an easy way to process large (possibly infinite) stream of
* events. Event can be of any object type, different types of events can be submitted to streamer. Each event
- * is processed by one or more {@link org.gridgain.grid.streamer.GridStreamerStage}, a set of stages event passed through is called pipeline.
+ * is processed by one or more {@link org.gridgain.grid.streamer.StreamerStage}, a set of stages event passed through is called pipeline.
* <p>
* For each submitted group of events streamer determines one or more execution nodes that will process this
* group of events. Execution nodes are determined by {@link org.gridgain.grid.streamer.StreamerEventRouter}. Execution nodes run stages
@@ -36,7 +36,7 @@ import java.util.*;
* of failure and will try to execute pipeline from the beginning. If failover cannot be succeeded or maximum number
* of failover attempts is exceeded, then listener will be notified on node which originated pipeline execution.
*
- * @see org.gridgain.grid.streamer.GridStreamerStage
+ * @see org.gridgain.grid.streamer.StreamerStage
* @see org.gridgain.grid.streamer.StreamerEventRouter
*/
public interface IgniteStreamer {
@@ -124,7 +124,7 @@ public interface IgniteStreamer {
public StreamerMetrics metrics();
/**
- * Resets all configured streamer windows by calling {@link GridStreamerWindow#reset()} on each and
+ * Resets all configured streamer windows by calling {@link org.gridgain.grid.streamer.StreamerWindow#reset()} on each and
* clears local space.
* <p>
* This is local method, it will clear only local windows and local space. Note that windows and
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
index 8330a06..10cb8ac 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
@@ -76,11 +76,11 @@ public class GridStreamProcessor extends GridProcessorAdapter {
}
// Add mbeans for stages.
- for (GridStreamerStage stage : s.configuration().getStages()) {
+ for (StreamerStage stage : s.configuration().getStages()) {
try {
mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()), "Stage-" + stage.name(),
- new GridStreamerStageMBeanAdapter(stage.name(), stage.getClass().getName(), s),
- GridStreamerStageMBean.class));
+ new StreamerStageMBeanAdapter(stage.name(), stage.getClass().getName(), s),
+ StreamerStageMBean.class));
if (log.isDebugEnabled())
log.debug("Registered MBean for streamer stage [streamer=" + s.name() +
@@ -93,13 +93,13 @@ public class GridStreamProcessor extends GridProcessorAdapter {
}
// Add mbeans for windows.
- for (GridStreamerWindow win : s.configuration().getWindows()) {
+ for (StreamerWindow win : s.configuration().getWindows()) {
try {
- if (hasInterface(win.getClass(), GridStreamerWindowMBean.class)) {
+ if (hasInterface(win.getClass(), StreamerWindowMBean.class)) {
mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()),
"Window-" + win.name(),
- (GridStreamerWindowMBean)win,
- GridStreamerWindowMBean.class));
+ (StreamerWindowMBean)win,
+ StreamerWindowMBean.class));
if (log.isDebugEnabled())
log.debug("Registered MBean for streamer window [streamer=" + s.name() +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
index b7849e4..18b0701 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerAttributes.java
@@ -59,7 +59,7 @@ public class GridStreamerAttributes implements Externalizable {
stages = new LinkedList<>();
if (!F.isEmpty(cfg.getStages())) {
- for (GridStreamerStage stage : cfg.getStages())
+ for (StreamerStage stage : cfg.getStages())
stages.add(F.t(stage.name(), stage.getClass().getName()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
index fb8e1bf..064e3db 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextDelegate.java
@@ -48,12 +48,12 @@ public class GridStreamerContextDelegate implements StreamerContext {
}
/** {@inheritDoc} */
- @Override public <E> GridStreamerWindow<E> window() {
+ @Override public <E> StreamerWindow<E> window() {
return delegate.window();
}
/** {@inheritDoc} */
- @Override public <E> GridStreamerWindow<E> window(String winName) {
+ @Override public <E> StreamerWindow<E> window(String winName) {
return delegate.window(winName);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
index c8320ab..e9c64ca 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerContextImpl.java
@@ -74,13 +74,13 @@ public class GridStreamerContextImpl implements StreamerContext {
}
/** {@inheritDoc} */
- @Override public <E> GridStreamerWindow<E> window() {
+ @Override public <E> StreamerWindow<E> window() {
return streamer.window();
}
/** {@inheritDoc} */
- @Override public <E> GridStreamerWindow<E> window(String winName) {
- GridStreamerWindow<E> window = streamer.window(winName);
+ @Override public <E> StreamerWindow<E> window(String winName) {
+ StreamerWindow<E> window = streamer.window(winName);
if (window == null)
throw new IllegalArgumentException("Streamer window is not configured: " + winName);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java
deleted file mode 100644
index 1a8ba97..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMBeanAdapter.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-
-/**
- * Streamer stage MBean adapter.
- */
-@SuppressWarnings("ConstantConditions")
-public class GridStreamerStageMBeanAdapter implements GridStreamerStageMBean {
- /** Stage name. */
- private String stageName;
-
- /** Stage class name. */
- private String stageClsName;
-
- /** */
- private IgniteStreamerImpl streamer;
-
- /**
- * @param stageName Stage name.
- * @param stageClsName Stage class name.
- * @param streamer Streamer implementation.
- */
- public GridStreamerStageMBeanAdapter(String stageName, String stageClsName, IgniteStreamerImpl streamer) {
- this.stageName = stageName;
- this.stageClsName = stageClsName;
- this.streamer = streamer;
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return stageName;
- }
-
- /** {@inheritDoc} */
- @Override public String getStageClassName() {
- return stageClsName;
- }
-
- /** {@inheritDoc} */
- @Override public long getMinimumExecutionTime() {
- return streamer.metrics().stageMetrics(stageName).minimumExecutionTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getMaximumExecutionTime() {
- return streamer.metrics().stageMetrics(stageName).maximumExecutionTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getAverageExecutionTime() {
- return streamer.metrics().stageMetrics(stageName).averageExecutionTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getMinimumWaitingTime() {
- return streamer.metrics().stageMetrics(stageName).minimumWaitingTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getMaximumWaitingTime() {
- return streamer.metrics().stageMetrics(stageName).maximumWaitingTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getAverageWaitingTime() {
- return streamer.metrics().stageMetrics(stageName).averageWaitingTime();
- }
-
- /** {@inheritDoc} */
- @Override public long getTotalExecutionCount() {
- return streamer.metrics().stageMetrics(stageName).totalExecutionCount();
- }
-
- /** {@inheritDoc} */
- @Override public int getFailuresCount() {
- return streamer.metrics().stageMetrics(stageName).failuresCount();
- }
-
- /** {@inheritDoc} */
- @Override public boolean isExecuting() {
- return streamer.metrics().stageMetrics(stageName).executing();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java
deleted file mode 100644
index e73ff4f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsAdapter.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Streamer stage metrics adapter.
- */
-public class GridStreamerStageMetricsAdapter implements GridStreamerStageMetrics {
- /** */
- private String name;
-
- /** */
- private long minExecTime;
-
- /** */
- private long maxExecTime;
-
- /** */
- private long avgExecTime;
-
- /** */
- private long minWaitTime;
-
- /** */
- private long maxWaitTime;
-
- /** */
- private long avgWaitTime;
-
- /** */
- private long totalExecCnt;
-
- /** */
- private int failuresCnt;
-
- /** */
- private boolean executing;
-
- /**
- * Empty constructor.
- */
- public GridStreamerStageMetricsAdapter() {
- // No-op.
- }
-
- /**
- * @param metrics Metrics.
- */
- public GridStreamerStageMetricsAdapter(GridStreamerStageMetrics metrics) {
- // Preserve alphabetic order for maintenance.
- avgExecTime = metrics.averageExecutionTime();
- avgWaitTime = metrics.averageWaitingTime();
- executing = metrics.executing();
- failuresCnt = metrics.failuresCount();
- maxExecTime = metrics.maximumExecutionTime();
- maxWaitTime = metrics.maximumWaitingTime();
- minExecTime = metrics.minimumExecutionTime();
- minWaitTime = metrics.minimumWaitingTime();
- name = metrics.name();
- totalExecCnt = metrics.totalExecutionCount();
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public long minimumExecutionTime() {
- return minExecTime;
- }
-
- /** {@inheritDoc} */
- @Override public long maximumExecutionTime() {
- return maxExecTime;
- }
-
- /** {@inheritDoc} */
- @Override public long averageExecutionTime() {
- return avgExecTime;
- }
-
- /** {@inheritDoc} */
- @Override public long totalExecutionCount() {
- return totalExecCnt;
- }
-
- /** {@inheritDoc} */
- @Override public long minimumWaitingTime() {
- return minWaitTime;
- }
-
- /** {@inheritDoc} */
- @Override public long maximumWaitingTime() {
- return maxWaitTime;
- }
-
- /** {@inheritDoc} */
- @Override public long averageWaitingTime() {
- return avgWaitTime;
- }
-
- /** {@inheritDoc} */
- @Override public int failuresCount() {
- return failuresCnt;
- }
-
- /** {@inheritDoc} */
- @Override public boolean executing() {
- return executing;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridStreamerStageMetricsAdapter.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java
deleted file mode 100644
index e45830c..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageMetricsHolder.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.*;
-import org.jdk8.backport.*;
-
-/**
- * Streamer stage metrics holder.
- */
-public class GridStreamerStageMetricsHolder implements GridStreamerStageMetrics {
- /** Stage name. */
- private String name;
-
- /** Minimum execution time. */
- private GridAtomicLong minExecTime = new GridAtomicLong(Long.MAX_VALUE);
-
- /** Maximum execution time. */
- private GridAtomicLong maxExecTime = new GridAtomicLong();
-
- /** Stage execution time sum. */
- private LongAdder sumExecTime = new LongAdder();
-
- /** Stage minimum waiting time. */
- private GridAtomicLong minWaitTime = new GridAtomicLong(Long.MAX_VALUE);
-
- /** Stage maximum waiting time. */
- private GridAtomicLong maxWaitTime = new GridAtomicLong();
-
- /** Stage average waiting time sum. */
- private LongAdder sumWaitTime = new LongAdder();
-
- /** Total number of times this stage was executed. */
- private LongAdder totalExecCnt = new LongAdder();
-
- /** Failures count. */
- private LongAdder failuresCnt = new LongAdder();
-
- /** Number of threads executing this stage. */
- private LongAdder curActive = new LongAdder();
-
- /**
- * @param name Stage name.
- */
- public GridStreamerStageMetricsHolder(String name) {
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public long minimumExecutionTime() {
- long min = minExecTime.get();
-
- return min == Long.MAX_VALUE ? 0 : min;
- }
-
- /** {@inheritDoc} */
- @Override public long maximumExecutionTime() {
- return maxExecTime.get();
- }
-
- /** {@inheritDoc} */
- @Override public long averageExecutionTime() {
- long execTime = sumExecTime.sum();
-
- long execs = totalExecCnt.sum();
-
- return execs == 0 ? 0 : execTime / execs;
- }
-
- /** {@inheritDoc} */
- @Override public long minimumWaitingTime() {
- long min = minWaitTime.get();
-
- return min == Long.MAX_VALUE ? 0 : min;
- }
-
- /** {@inheritDoc} */
- @Override public long maximumWaitingTime() {
- return maxWaitTime.get();
- }
-
- /** {@inheritDoc} */
- @Override public long averageWaitingTime() {
- long waitTime = sumWaitTime.sum();
-
- long execs = totalExecCnt.sum();
-
- return execs == 0 ? 0 : waitTime / execs;
- }
-
- /** {@inheritDoc} */
- @Override public long totalExecutionCount() {
- return totalExecCnt.longValue();
- }
-
- /** {@inheritDoc} */
- @Override public int failuresCount() {
- return failuresCnt.intValue();
- }
-
- /** {@inheritDoc} */
- @Override public boolean executing() {
- return curActive.intValue() > 0;
- }
-
- /**
- * Execution started callback.
- *
- * @param waitTime Wait time.
- */
- public void onExecutionStarted(long waitTime) {
- if (waitTime < 0)
- waitTime = 0;
-
- curActive.increment();
-
- maxWaitTime.setIfGreater(waitTime);
- minWaitTime.setIfLess(waitTime);
- sumWaitTime.add(waitTime);
- }
-
- /**
- * Execution finished callback.
- *
- * @param execTime Stage execution time.
- */
- public void onExecutionFinished(long execTime) {
- if (execTime < 0)
- execTime = 0;
-
- curActive.decrement();
-
- maxExecTime.setIfGreater(execTime);
- minExecTime.setIfLess(execTime);
- sumExecTime.add(execTime);
-
- totalExecCnt.increment();
- }
-
- /**
- * Failure callback.
- */
- public void onFailure() {
- failuresCnt.increment();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java
deleted file mode 100644
index 74b75ae..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerStageWrapper.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Stage wrapper that handles metrics calculation and time measurement.
- */
-public class GridStreamerStageWrapper implements GridStreamerStage<Object> {
- /** Stage delegate. */
- private GridStreamerStage<Object> delegate;
-
- /** Stage index. */
- private int idx;
-
- /** Next stage name. Set after creation. */
- private String nextStageName;
-
- /**
- * @param delegate Delegate stage.
- * @param idx Index.
- */
- public GridStreamerStageWrapper(GridStreamerStage<Object> delegate, int idx) {
- this.delegate = delegate;
- this.idx = idx;
- }
-
- /**
- * @return Stage index.
- */
- public int index() {
- return idx;
- }
-
- /**
- * @return Next stage name in pipeline or {@code null} if this is the last stage.
- */
- @Nullable public String nextStageName() {
- return nextStageName;
- }
-
- /**
- * @param nextStageName Next stage name in pipeline.
- */
- public void nextStageName(String nextStageName) {
- this.nextStageName = nextStageName;
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return delegate.name();
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts)
- throws GridException {
- return delegate.run(ctx, evts);
- }
-
- /**
- * @return Delegate.
- */
- public GridStreamerStage unwrap() {
- return delegate;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridStreamerStageWrapper.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java
deleted file mode 100644
index c11abdf..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsAdapter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Streamer window metrics adapter.
- */
-public class GridStreamerWindowMetricsAdapter implements GridStreamerWindowMetrics {
- /** Window name. */
- private String name;
-
- /** Window size. */
- private int size;
-
- /** Window eviction queue size. */
- private int evictionQueueSize;
-
- /**
- * @param m Metrics to copy.
- */
- public GridStreamerWindowMetricsAdapter(GridStreamerWindowMetrics m) {
- // Preserve alphabetic order for maintenance.
- evictionQueueSize = m.evictionQueueSize();
- name = m.name();
- size = m.size();
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return size;
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- return evictionQueueSize;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridStreamerWindowMetricsAdapter.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java
deleted file mode 100644
index 11ed8c6..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerWindowMetricsHolder.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.kernal.processors.streamer;
-
-import org.gridgain.grid.streamer.*;
-
-/**
- * Streamer window metrics holder.
- */
-public class GridStreamerWindowMetricsHolder implements GridStreamerWindowMetrics {
- /** Window instance. */
- private GridStreamerWindow window;
-
- /**
- * @param window Streamer window.
- */
- public GridStreamerWindowMetricsHolder(GridStreamerWindow window) {
- this.window = window;
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return window.name();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return window.size();
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- return window.evictionQueueSize();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
index c4cd9dd..fc51a0e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerEx.java
@@ -31,7 +31,7 @@ public interface IgniteStreamerEx extends IgniteStreamer {
*
* @return Streamer window.
*/
- public <E> GridStreamerWindow<E> window();
+ public <E> StreamerWindow<E> window();
/**
* Gets streamer window by window name.
@@ -39,7 +39,7 @@ public interface IgniteStreamerEx extends IgniteStreamer {
* @param windowName Window name.
* @return Streamer window.
*/
- @Nullable public <E> GridStreamerWindow<E> window(String windowName);
+ @Nullable public <E> StreamerWindow<E> window(String windowName);
/**
* Called before execution requests are sent to remote nodes or scheduled for local execution.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
index deb3e16..1098e84 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/IgniteStreamerImpl.java
@@ -78,14 +78,14 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
/** Stages. */
@GridToStringInclude
- private Map<String, GridStreamerStageWrapper> stages;
+ private Map<String, StreamerStageWrapper> stages;
/** Windows. */
@GridToStringInclude
- private Map<String, GridStreamerWindow> winMap;
+ private Map<String, StreamerWindow> winMap;
/** Default streamer window. */
- private GridStreamerWindow dfltWin;
+ private StreamerWindow dfltWin;
/** */
private String firstStage;
@@ -191,9 +191,9 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
int stageIdx = 0;
- GridStreamerStageWrapper prev = null;
+ StreamerStageWrapper prev = null;
- for (GridStreamerStage s : c.getStages()) {
+ for (StreamerStage s : c.getStages()) {
String sName = s.name();
if (F.isEmpty(sName))
@@ -207,7 +207,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
if (firstStage == null)
firstStage = sName;
- GridStreamerStageWrapper wrapper = new GridStreamerStageWrapper(s, stageIdx);
+ StreamerStageWrapper wrapper = new StreamerStageWrapper(s, stageIdx);
stages.put(sName, wrapper);
@@ -221,7 +221,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
winMap = new LinkedHashMap<>();
- for (GridStreamerWindow w : c.getWindows()) {
+ for (StreamerWindow w : c.getWindows()) {
String wName = w.name();
if (F.isEmpty(wName))
@@ -292,7 +292,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
* @throws GridException If failed.
*/
private void prepareResources() throws GridException {
- for (GridStreamerStage s : c.getStages())
+ for (StreamerStage s : c.getStages())
ctx.resource().injectGeneric(s);
if (router == null)
@@ -300,7 +300,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
ctx.resource().injectGeneric(router);
- for (GridStreamerWindow w : c.getWindows())
+ for (StreamerWindow w : c.getWindows())
ctx.resource().injectGeneric(w);
}
@@ -359,7 +359,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
}
}
- for (GridStreamerStageWrapper stage : stages.values()) {
+ for (StreamerStageWrapper stage : stages.values()) {
try {
ctx.resource().cleanupGeneric(stage.unwrap());
}
@@ -482,7 +482,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
winLock.writeLock();
try {
- for (GridStreamerWindow win : winMap.values())
+ for (StreamerWindow win : winMap.values())
win.reset();
streamerCtx.localSpace().clear();
@@ -494,22 +494,22 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
/** {@inheritDoc} */
@Override public void resetMetrics() {
- GridStreamerStageMetricsHolder[] stageHolders = new GridStreamerStageMetricsHolder[c.getStages().size()];
+ StreamerStageMetricsHolder[] stageHolders = new StreamerStageMetricsHolder[c.getStages().size()];
int idx = 0;
- for (GridStreamerStage stage : c.getStages()) {
- stageHolders[idx] = new GridStreamerStageMetricsHolder(stage.name());
+ for (StreamerStage stage : c.getStages()) {
+ stageHolders[idx] = new StreamerStageMetricsHolder(stage.name());
idx++;
}
- GridStreamerWindowMetricsHolder[] windowHolders = new GridStreamerWindowMetricsHolder[c.getWindows().size()];
+ StreamerWindowMetricsHolder[] windowHolders = new StreamerWindowMetricsHolder[c.getWindows().size()];
idx = 0;
- for (GridStreamerWindow w : c.getWindows()) {
- windowHolders[idx] = new GridStreamerWindowMetricsHolder(w);
+ for (StreamerWindow w : c.getWindows()) {
+ windowHolders[idx] = new StreamerWindowMetricsHolder(w);
idx++;
}
@@ -523,13 +523,13 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
}
/** {@inheritDoc} */
- @Override public <E> GridStreamerWindow<E> window() {
- return (GridStreamerWindow<E>)dfltWin;
+ @Override public <E> StreamerWindow<E> window() {
+ return (StreamerWindow<E>)dfltWin;
}
/** {@inheritDoc} */
- @Override public <E> GridStreamerWindow<E> window(String windowName) {
- return (GridStreamerWindow<E>)winMap.get(windowName);
+ @Override public <E> StreamerWindow<E> window(String windowName) {
+ return (StreamerWindow<E>)winMap.get(windowName);
}
/**
@@ -712,7 +712,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
if (log.isDebugEnabled())
log.debug("Scheduling local batch execution [futId=" + futId + ", stageName=" + batch.stageName() + ']');
- GridStreamerStageWrapper wrapper = stages.get(batch.stageName());
+ StreamerStageWrapper wrapper = stages.get(batch.stageName());
if (wrapper == null) {
completeParentStage(ctx.localNodeId(), batch.futureId(),
@@ -1226,7 +1226,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
private GridStreamerExecutionBatch batch;
/** */
- private GridStreamerStageWrapper stageWrapper;
+ private StreamerStageWrapper stageWrapper;
/** Streamer metrics holder. */
private StreamerMetricsHolder streamerHolder;
@@ -1246,7 +1246,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
*/
private BatchWorker(
GridStreamerExecutionBatch batch,
- GridStreamerStageWrapper stageWrapper,
+ StreamerStageWrapper stageWrapper,
StreamerMetricsHolder streamerHolder
) {
super(ctx.gridName(), "streamer-batch-worker-" + batch.stageName(), log);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
index 2eb99ee..445bf9a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsAdapter.java
@@ -78,11 +78,11 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
/** */
@GridToStringInclude
- private Map<String, GridStreamerStageMetrics> stageMetrics;
+ private Map<String, StreamerStageMetrics> stageMetrics;
/** */
@GridToStringInclude
- private Map<String, GridStreamerWindowMetrics> windowMetrics;
+ private Map<String, StreamerWindowMetrics> windowMetrics;
/**
* Empty constructor.
@@ -117,17 +117,17 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
stageWaitingExecCnt = metrics.stageWaitingExecutionCount();
// Stage metrics.
- Map<String, GridStreamerStageMetrics> map = U.newLinkedHashMap(metrics.stageMetrics().size());
+ Map<String, StreamerStageMetrics> map = U.newLinkedHashMap(metrics.stageMetrics().size());
- for (GridStreamerStageMetrics m : metrics.stageMetrics())
- map.put(m.name(), new GridStreamerStageMetricsAdapter(m));
+ for (StreamerStageMetrics m : metrics.stageMetrics())
+ map.put(m.name(), new StreamerStageMetricsAdapter(m));
stageMetrics = Collections.unmodifiableMap(map);
- Map<String, GridStreamerWindowMetrics> map0 = U.newLinkedHashMap(metrics.windowMetrics().size());
+ Map<String, StreamerWindowMetrics> map0 = U.newLinkedHashMap(metrics.windowMetrics().size());
- for (GridStreamerWindowMetrics m : metrics.windowMetrics())
- map0.put(m.name(), new GridStreamerWindowMetricsAdapter(m));
+ for (StreamerWindowMetrics m : metrics.windowMetrics())
+ map0.put(m.name(), new StreamerWindowMetricsAdapter(m));
windowMetrics = Collections.unmodifiableMap(map0);
}
@@ -228,8 +228,8 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
}
/** {@inheritDoc} */
- @Override public GridStreamerStageMetrics stageMetrics(String stageName) {
- GridStreamerStageMetrics metrics = stageMetrics.get(stageName);
+ @Override public StreamerStageMetrics stageMetrics(String stageName) {
+ StreamerStageMetrics metrics = stageMetrics.get(stageName);
if (metrics == null)
throw new IllegalArgumentException("Streamer stage is not configured: " + stageName);
@@ -238,13 +238,13 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
}
/** {@inheritDoc} */
- @Override public Collection<GridStreamerStageMetrics> stageMetrics() {
+ @Override public Collection<StreamerStageMetrics> stageMetrics() {
return stageMetrics.values();
}
/** {@inheritDoc} */
- @Override public GridStreamerWindowMetrics windowMetrics(String winName) {
- GridStreamerWindowMetrics metrics = windowMetrics.get(winName);
+ @Override public StreamerWindowMetrics windowMetrics(String winName) {
+ StreamerWindowMetrics metrics = windowMetrics.get(winName);
if (metrics == null)
throw new IllegalArgumentException("Streamer window is not configured: " + winName);
@@ -253,7 +253,7 @@ public class StreamerMetricsAdapter implements StreamerMetrics {
}
/** {@inheritDoc} */
- @Override public Collection<GridStreamerWindowMetrics> windowMetrics() {
+ @Override public Collection<StreamerWindowMetrics> windowMetrics() {
return windowMetrics.values();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
index eed6408..3ad6b2b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerMetricsHolder.java
@@ -89,13 +89,13 @@ public class StreamerMetricsHolder implements StreamerMetrics {
private LongAdder failuresCnt = new LongAdder();
/** Stages metrics. */
- private final GridStreamerStageMetricsHolder[] stageMetrics;
+ private final StreamerStageMetricsHolder[] stageMetrics;
/** Stage metrics map. */
- private final Map<String, GridStreamerStageMetrics> stageMetricsMap;
+ private final Map<String, StreamerStageMetrics> stageMetricsMap;
/** Window metrics map. */
- private final Map<String, GridStreamerWindowMetrics> windowMetricsMap;
+ private final Map<String, StreamerWindowMetrics> windowMetricsMap;
/** Executor service capacity. */
private final int execSvcCap;
@@ -106,23 +106,23 @@ public class StreamerMetricsHolder implements StreamerMetrics {
* @param execSvcCap Executor service capacity.
*/
public StreamerMetricsHolder(
- GridStreamerStageMetricsHolder[] stageMetrics,
- GridStreamerWindowMetricsHolder[] windowMetrics,
+ StreamerStageMetricsHolder[] stageMetrics,
+ StreamerWindowMetricsHolder[] windowMetrics,
int execSvcCap
) {
this.execSvcCap = execSvcCap;
this.stageMetrics = stageMetrics;
- Map<String, GridStreamerStageMetrics> map = new LinkedHashMap<>();
+ Map<String, StreamerStageMetrics> map = new LinkedHashMap<>();
- for (GridStreamerStageMetricsHolder holder : stageMetrics)
+ for (StreamerStageMetricsHolder holder : stageMetrics)
map.put(holder.name(), holder);
stageMetricsMap = Collections.unmodifiableMap(map);
- Map<String, GridStreamerWindowMetrics> map0 = new LinkedHashMap<>();
+ Map<String, StreamerWindowMetrics> map0 = new LinkedHashMap<>();
- for (GridStreamerWindowMetricsHolder holder : windowMetrics)
+ for (StreamerWindowMetricsHolder holder : windowMetrics)
map0.put(holder.name(), holder);
windowMetricsMap = Collections.unmodifiableMap(map0);
@@ -248,8 +248,8 @@ public class StreamerMetricsHolder implements StreamerMetrics {
}
/** {@inheritDoc} */
- @Override public GridStreamerStageMetrics stageMetrics(String stageName) {
- GridStreamerStageMetrics metrics = stageMetricsMap.get(stageName);
+ @Override public StreamerStageMetrics stageMetrics(String stageName) {
+ StreamerStageMetrics metrics = stageMetricsMap.get(stageName);
if (metrics == null)
throw new IllegalArgumentException("Streamer stage is not configured: " + stageName);
@@ -258,13 +258,13 @@ public class StreamerMetricsHolder implements StreamerMetrics {
}
/** {@inheritDoc} */
- @Override public Collection<GridStreamerStageMetrics> stageMetrics() {
+ @Override public Collection<StreamerStageMetrics> stageMetrics() {
return stageMetricsMap.values();
}
/** {@inheritDoc} */
- @Override public GridStreamerWindowMetrics windowMetrics(String winName) {
- GridStreamerWindowMetrics metrics = windowMetricsMap.get(winName);
+ @Override public StreamerWindowMetrics windowMetrics(String winName) {
+ StreamerWindowMetrics metrics = windowMetricsMap.get(winName);
if (metrics == null)
throw new IllegalArgumentException("Streamer window is not configured: " + winName);
@@ -273,7 +273,7 @@ public class StreamerMetricsHolder implements StreamerMetrics {
}
/** {@inheritDoc} */
- @Override public Collection<GridStreamerWindowMetrics> windowMetrics() {
+ @Override public Collection<StreamerWindowMetrics> windowMetrics() {
return windowMetricsMap.values();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java
new file mode 100644
index 0000000..a6101f9
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMBeanAdapter.java
@@ -0,0 +1,93 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+
+/**
+ * Streamer stage MBean adapter.
+ */
+@SuppressWarnings("ConstantConditions")
+public class StreamerStageMBeanAdapter implements StreamerStageMBean {
+ /** Stage name. */
+ private String stageName;
+
+ /** Stage class name. */
+ private String stageClsName;
+
+ /** */
+ private IgniteStreamerImpl streamer;
+
+ /**
+ * @param stageName Stage name.
+ * @param stageClsName Stage class name.
+ * @param streamer Streamer implementation.
+ */
+ public StreamerStageMBeanAdapter(String stageName, String stageClsName, IgniteStreamerImpl streamer) {
+ this.stageName = stageName;
+ this.stageClsName = stageClsName;
+ this.streamer = streamer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return stageName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStageClassName() {
+ return stageClsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMinimumExecutionTime() {
+ return streamer.metrics().stageMetrics(stageName).minimumExecutionTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaximumExecutionTime() {
+ return streamer.metrics().stageMetrics(stageName).maximumExecutionTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getAverageExecutionTime() {
+ return streamer.metrics().stageMetrics(stageName).averageExecutionTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMinimumWaitingTime() {
+ return streamer.metrics().stageMetrics(stageName).minimumWaitingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMaximumWaitingTime() {
+ return streamer.metrics().stageMetrics(stageName).maximumWaitingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getAverageWaitingTime() {
+ return streamer.metrics().stageMetrics(stageName).averageWaitingTime();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getTotalExecutionCount() {
+ return streamer.metrics().stageMetrics(stageName).totalExecutionCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getFailuresCount() {
+ return streamer.metrics().stageMetrics(stageName).failuresCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isExecuting() {
+ return streamer.metrics().stageMetrics(stageName).executing();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java
new file mode 100644
index 0000000..afa7543
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsAdapter.java
@@ -0,0 +1,127 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Streamer stage metrics adapter.
+ */
+public class StreamerStageMetricsAdapter implements StreamerStageMetrics {
+ /** */
+ private String name;
+
+ /** */
+ private long minExecTime;
+
+ /** */
+ private long maxExecTime;
+
+ /** */
+ private long avgExecTime;
+
+ /** */
+ private long minWaitTime;
+
+ /** */
+ private long maxWaitTime;
+
+ /** */
+ private long avgWaitTime;
+
+ /** */
+ private long totalExecCnt;
+
+ /** */
+ private int failuresCnt;
+
+ /** */
+ private boolean executing;
+
+ /**
+ * Empty constructor.
+ */
+ public StreamerStageMetricsAdapter() {
+ // No-op.
+ }
+
+ /**
+ * @param metrics Metrics.
+ */
+ public StreamerStageMetricsAdapter(StreamerStageMetrics metrics) {
+ // Preserve alphabetic order for maintenance.
+ avgExecTime = metrics.averageExecutionTime();
+ avgWaitTime = metrics.averageWaitingTime();
+ executing = metrics.executing();
+ failuresCnt = metrics.failuresCount();
+ maxExecTime = metrics.maximumExecutionTime();
+ maxWaitTime = metrics.maximumWaitingTime();
+ minExecTime = metrics.minimumExecutionTime();
+ minWaitTime = metrics.minimumWaitingTime();
+ name = metrics.name();
+ totalExecCnt = metrics.totalExecutionCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long minimumExecutionTime() {
+ return minExecTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long maximumExecutionTime() {
+ return maxExecTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long averageExecutionTime() {
+ return avgExecTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long totalExecutionCount() {
+ return totalExecCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long minimumWaitingTime() {
+ return minWaitTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long maximumWaitingTime() {
+ return maxWaitTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long averageWaitingTime() {
+ return avgWaitTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int failuresCount() {
+ return failuresCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean executing() {
+ return executing;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StreamerStageMetricsAdapter.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java
new file mode 100644
index 0000000..cb76166
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageMetricsHolder.java
@@ -0,0 +1,159 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.*;
+import org.jdk8.backport.*;
+
+/**
+ * Streamer stage metrics holder.
+ */
+public class StreamerStageMetricsHolder implements StreamerStageMetrics {
+ /** Stage name. */
+ private String name;
+
+ /** Minimum execution time. */
+ private GridAtomicLong minExecTime = new GridAtomicLong(Long.MAX_VALUE);
+
+ /** Maximum execution time. */
+ private GridAtomicLong maxExecTime = new GridAtomicLong();
+
+ /** Stage execution time sum. */
+ private LongAdder sumExecTime = new LongAdder();
+
+ /** Stage minimum waiting time. */
+ private GridAtomicLong minWaitTime = new GridAtomicLong(Long.MAX_VALUE);
+
+ /** Stage maximum waiting time. */
+ private GridAtomicLong maxWaitTime = new GridAtomicLong();
+
+ /** Stage average waiting time sum. */
+ private LongAdder sumWaitTime = new LongAdder();
+
+ /** Total number of times this stage was executed. */
+ private LongAdder totalExecCnt = new LongAdder();
+
+ /** Failures count. */
+ private LongAdder failuresCnt = new LongAdder();
+
+ /** Number of threads executing this stage. */
+ private LongAdder curActive = new LongAdder();
+
+ /**
+ * @param name Stage name.
+ */
+ public StreamerStageMetricsHolder(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long minimumExecutionTime() {
+ long min = minExecTime.get();
+
+ return min == Long.MAX_VALUE ? 0 : min;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long maximumExecutionTime() {
+ return maxExecTime.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long averageExecutionTime() {
+ long execTime = sumExecTime.sum();
+
+ long execs = totalExecCnt.sum();
+
+ return execs == 0 ? 0 : execTime / execs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long minimumWaitingTime() {
+ long min = minWaitTime.get();
+
+ return min == Long.MAX_VALUE ? 0 : min;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long maximumWaitingTime() {
+ return maxWaitTime.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long averageWaitingTime() {
+ long waitTime = sumWaitTime.sum();
+
+ long execs = totalExecCnt.sum();
+
+ return execs == 0 ? 0 : waitTime / execs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long totalExecutionCount() {
+ return totalExecCnt.longValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int failuresCount() {
+ return failuresCnt.intValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean executing() {
+ return curActive.intValue() > 0;
+ }
+
+ /**
+ * Execution started callback.
+ *
+ * @param waitTime Wait time.
+ */
+ public void onExecutionStarted(long waitTime) {
+ if (waitTime < 0)
+ waitTime = 0;
+
+ curActive.increment();
+
+ maxWaitTime.setIfGreater(waitTime);
+ minWaitTime.setIfLess(waitTime);
+ sumWaitTime.add(waitTime);
+ }
+
+ /**
+ * Execution finished callback.
+ *
+ * @param execTime Stage execution time.
+ */
+ public void onExecutionFinished(long execTime) {
+ if (execTime < 0)
+ execTime = 0;
+
+ curActive.decrement();
+
+ maxExecTime.setIfGreater(execTime);
+ minExecTime.setIfLess(execTime);
+ sumExecTime.add(execTime);
+
+ totalExecCnt.increment();
+ }
+
+ /**
+ * Failure callback.
+ */
+ public void onFailure() {
+ failuresCnt.increment();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java
new file mode 100644
index 0000000..4f55a78
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerStageWrapper.java
@@ -0,0 +1,84 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Stage wrapper that handles metrics calculation and time measurement.
+ */
+public class StreamerStageWrapper implements StreamerStage<Object> {
+ /** Stage delegate. */
+ private StreamerStage<Object> delegate;
+
+ /** Stage index. */
+ private int idx;
+
+ /** Next stage name. Set after creation. */
+ private String nextStageName;
+
+ /**
+ * @param delegate Delegate stage.
+ * @param idx Index.
+ */
+ public StreamerStageWrapper(StreamerStage<Object> delegate, int idx) {
+ this.delegate = delegate;
+ this.idx = idx;
+ }
+
+ /**
+ * @return Stage index.
+ */
+ public int index() {
+ return idx;
+ }
+
+ /**
+ * @return Next stage name in pipeline or {@code null} if this is the last stage.
+ */
+ @Nullable public String nextStageName() {
+ return nextStageName;
+ }
+
+ /**
+ * @param nextStageName Next stage name in pipeline.
+ */
+ public void nextStageName(String nextStageName) {
+ this.nextStageName = nextStageName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return delegate.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts)
+ throws GridException {
+ return delegate.run(ctx, evts);
+ }
+
+ /**
+ * @return Delegate.
+ */
+ public StreamerStage unwrap() {
+ return delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StreamerStageWrapper.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java
new file mode 100644
index 0000000..b181fa7
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsAdapter.java
@@ -0,0 +1,57 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+/**
+ * Streamer window metrics adapter.
+ */
+public class StreamerWindowMetricsAdapter implements StreamerWindowMetrics {
+ /** Window name. */
+ private String name;
+
+ /** Window size. */
+ private int size;
+
+ /** Window eviction queue size. */
+ private int evictionQueueSize;
+
+ /**
+ * @param m Metrics to copy.
+ */
+ public StreamerWindowMetricsAdapter(StreamerWindowMetrics m) {
+ // Preserve alphabetic order for maintenance.
+ evictionQueueSize = m.evictionQueueSize();
+ name = m.name();
+ size = m.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int evictionQueueSize() {
+ return evictionQueueSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(StreamerWindowMetricsAdapter.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java
new file mode 100644
index 0000000..e7a1e02
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/StreamerWindowMetricsHolder.java
@@ -0,0 +1,42 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal.processors.streamer;
+
+import org.gridgain.grid.streamer.*;
+
+/**
+ * Streamer window metrics holder.
+ */
+public class StreamerWindowMetricsHolder implements StreamerWindowMetrics {
+ /** Window instance. */
+ private StreamerWindow window;
+
+ /**
+ * @param window Streamer window.
+ */
+ public StreamerWindowMetricsHolder(StreamerWindow window) {
+ this.window = window;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return window.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return window.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int evictionQueueSize() {
+ return window.evictionQueueSize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
index 08b6efe..1717333 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerMetrics.java
@@ -81,7 +81,7 @@ public class VisorStreamerMetrics implements Serializable {
int windowSz = 0;
- for (GridStreamerWindowMetrics wm : m.windowMetrics())
+ for (StreamerWindowMetrics wm : m.windowMetrics())
windowSz += wm.size();
VisorStreamerMetrics metrics = new VisorStreamerMetrics();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
index 0462476..ade0027 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/visor/streamer/VisorStreamerStageMetrics.java
@@ -17,7 +17,7 @@ import java.io.*;
import java.util.*;
/**
- * Data transfer object for {@link GridStreamerStageMetrics}.
+ * Data transfer object for {@link org.gridgain.grid.streamer.StreamerStageMetrics}.
*/
public class VisorStreamerStageMetrics implements Serializable {
/** */
@@ -60,7 +60,7 @@ public class VisorStreamerStageMetrics implements Serializable {
private int failuresFreq = -1;
/** Create data transfer object for given metrics. */
- public static VisorStreamerStageMetrics from(GridStreamerStageMetrics m) {
+ public static VisorStreamerStageMetrics from(StreamerStageMetrics m) {
assert m != null;
VisorStreamerStageMetrics metrics = new VisorStreamerStageMetrics();
@@ -88,7 +88,7 @@ public class VisorStreamerStageMetrics implements Serializable {
Collection<VisorStreamerStageMetrics> res = new ArrayList<>();
- for (GridStreamerStageMetrics m : streamer.metrics().stageMetrics())
+ for (StreamerStageMetrics m : streamer.metrics().stageMetrics())
res.add(from(m));
return res;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
deleted file mode 100644
index b994f4e..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStage.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.gridgain.grid.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer stage is a component that determines event processing flow. User logic related to
- * any particular event processing is implemented by streamer stage. A stage takes events as
- * an input and returns groups of events mapped to different stages as an output. Events for
- * every returned stage will be passed to {@link StreamerEventRouter} which will determine
- * on which node the stage should be executed.
- * <p>
- * Generally, event stage execution graph if fully controlled by return values of
- * this method, while node execution graph is controlled by
- * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
- */
-public interface GridStreamerStage<IN> {
- /**
- * Gets streamer stage name.
- *
- * @return Name of the stage.
- */
- public String name();
-
- /**
- * Stage execution routine. After the passed in events are processed, stage can emit
- * another set of events to be processed. The returned events can be mapped to different
- * stages. Events for every returned stage will be passed to {@link StreamerEventRouter}
- * which will determine on which node the stage should be executed.
- * <p>
- * Generally, event stage execution graph if fully controlled by return values of
- * this method, while node execution graph is controlled by
- * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
- *
- * @param ctx Streamer context.
- * @param evts Input events.
- * @return Map of stage name to collection of events.
- * @throws GridException If failed.
- */
- @Nullable public Map<String, Collection<?>> run(StreamerContext ctx, Collection<IN> evts)
- throws GridException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java
deleted file mode 100644
index 7505d2f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMBean.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.apache.ignite.mbean.*;
-
-/**
- * Streamer stage MBean.
- */
-@IgniteMBeanDescription("MBean that provides access to streamer stage description and metrics.")
-public interface GridStreamerStageMBean {
- /**
- * Gets stage name.
- *
- * @return Stage name.
- */
- @IgniteMBeanDescription("Stage name.")
- public String getName();
-
- /**
- * Gets stage class name.
- *
- * @return Stage class name.
- */
- @IgniteMBeanDescription("Stage class name.")
- public String getStageClassName();
-
- /**
- * Gets stage minimum execution time.
- *
- * @return Stage minimum execution time.
- */
- @IgniteMBeanDescription("Stage minimum execution time.")
- public long getMinimumExecutionTime();
-
- /**
- * Gets stage maximum execution time.
- *
- * @return Stage maximum execution time.
- */
- @IgniteMBeanDescription("Stage maximum execution time.")
- public long getMaximumExecutionTime();
-
- /**
- * Gets stage average execution time.
- *
- * @return Stage average execution time.
- */
- @IgniteMBeanDescription("Stage average execution time.")
- public long getAverageExecutionTime();
-
- /**
- * Gets stage minimum waiting time.
- *
- * @return Stage minimum waiting time.
- */
- @IgniteMBeanDescription("Stage minimum waiting time.")
- public long getMinimumWaitingTime();
-
- /**
- * Gets stage maximum waiting time.
- *
- * @return Stage maximum waiting time.
- */
- @IgniteMBeanDescription("Stage maximum waiting time.")
- public long getMaximumWaitingTime();
-
- /**
- * Stage average waiting time.
- *
- * @return Stage average waiting time.
- */
- @IgniteMBeanDescription("Stage average waiting time.")
- public long getAverageWaitingTime();
-
- /**
- * Gets total stage execution count since last reset.
- *
- * @return Number of times this stage was executed.
- */
- @IgniteMBeanDescription("Number of times this stage was executed.")
- public long getTotalExecutionCount();
-
- /**
- * Gets stage failure count.
- *
- * @return Stage failure count.
- */
- @IgniteMBeanDescription("Stage failure count.")
- public int getFailuresCount();
-
- /**
- * Gets flag indicating if stage is being currently executed by at least one thread on current node.
- *
- * @return {@code True} if stage is executing now.
- */
- @IgniteMBeanDescription("Whether stage is currently being executed.")
- public boolean isExecuting();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java
deleted file mode 100644
index f0beeb5..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerStageMetrics.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-/**
- * Streamer stage metrics.
- */
-public interface GridStreamerStageMetrics {
- /**
- * Gets stage name.
- *
- * @return Stage name.
- */
- public String name();
-
- /**
- * Gets stage minimum execution time.
- *
- * @return Stage minimum execution time.
- */
- public long minimumExecutionTime();
-
- /**
- * Gets stage maximum execution time.
- *
- * @return Stage maximum execution time.
- */
- public long maximumExecutionTime();
-
- /**
- * Gets stage average execution time.
- *
- * @return Stage average execution time.
- */
- public long averageExecutionTime();
-
- /**
- * Gets stage minimum waiting time.
- *
- * @return Stage minimum waiting time.
- */
- public long minimumWaitingTime();
-
- /**
- * Gets stage maximum waiting time.
- *
- * @return Stage maximum waiting time.
- */
- public long maximumWaitingTime();
-
- /**
- * Stage average waiting time.
- *
- * @return Stage average waiting time.
- */
- public long averageWaitingTime();
-
- /**
- * Gets total stage execution count since last reset.
- *
- * @return Number of times this stage was executed.
- */
- public long totalExecutionCount();
-
- /**
- * Gets stage failure count.
- *
- * @return Stage failure count.
- */
- public int failuresCount();
-
- /**
- * Gets flag indicating if stage is being currently executed by at least one thread on current node.
- *
- * @return {@code True} if stage is executing now.
- */
- public boolean executing();
-}