You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:08 UTC
[07/32] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java
new file mode 100644
index 0000000..1e5cf5f
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMBean.java
@@ -0,0 +1,161 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.apache.ignite.mbean.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * Streamer MBean interface.
+ */
+@IgniteMBeanDescription("MBean that provides access to streamer description and metrics.")
+public interface StreamerMBean {
+ /**
+ * Gets streamer name.
+ *
+ * @return Streamer name.
+ */
+ @IgniteMBeanDescription("Streamer name.")
+ @Nullable public String getName();
+
+ /**
+ * Gets {@code atLeastOnce} configuration flag.
+ *
+ * @return {@code True} if {@code atLeastOnce} is configured.
+ */
+ @IgniteMBeanDescription("True if atLeastOnce is configured.")
+ public boolean isAtLeastOnce();
+
+ /**
+ * Gets size of stage futures map. This map is maintained only when {@code atLeastOnce} configuration
+ * flag is set to true.
+ *
+ * @return Stage future map size.
+ */
+ @IgniteMBeanDescription("Stage future map size.")
+ public int getStageFutureMapSize();
+
+ /**
+ * Gets size of batch futures map.
+ *
+ * @return Batch future map size.
+ */
+ @IgniteMBeanDescription("Batch future map size.")
+ public int getBatchFutureMapSize();
+
+ /**
+ * Gets number of stages currently being executed in streamer pool.
+ *
+ * @return Number of stages. Cannot be more than pool thread count.
+ */
+ @IgniteMBeanDescription("Number of stages currently being executed in streamer pool.")
+ public int getStageActiveExecutionCount();
+
+ /**
+ * Gets number of event batches currently waiting to be executed.
+ *
+ * @return Number of event batches waiting to be processed.
+ */
+ @IgniteMBeanDescription("Number of event batches currently waiting to be executed.")
+ public int getStageWaitingExecutionCount();
+
+ /**
+ * Gets total number of stages executed since last reset.
+ *
+ * @return Total number of stages executed since last reset.
+ */
+ @IgniteMBeanDescription("Total number of stages executed since last reset.")
+ public long getStageTotalExecutionCount();
+
+ /**
+ * Gets pipeline maximum execution time, i.e. time between execution start and time when last stage in pipeline
+ * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+ * recorded independently.
+ *
+ * @return Pipeline maximum execution time.
+ */
+ @IgniteMBeanDescription("Pipeline maximum execution time.")
+ public long getPipelineMaximumExecutionTime();
+
+ /**
+ * Gets pipeline minimum execution time, i.e. time between execution start and time when last stage in pipeline
+ * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+ * recorded independently.
+ *
+ * @return Pipeline minimum execution time.
+ */
+ @IgniteMBeanDescription("Pipeline minimum execution time.")
+ public long getPipelineMinimumExecutionTime();
+
+ /**
+ * Gets pipeline average execution time, i.e. time between execution start and time when last stage in pipeline
+ * returned empty map. If pipeline execution was split, metrics for each split will be recorded independently.
+ *
+ * @return Pipeline average execution time.
+ */
+ @IgniteMBeanDescription("Pipeline average execution time.")
+ public long getPipelineAverageExecutionTime();
+
+ /**
+ * Gets maximum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Maximum number of unique nodes in pipeline execution.
+ */
+ @IgniteMBeanDescription("Maximum number of unique nodes participated in pipeline execution.")
+ public int getPipelineMaximumExecutionNodes();
+
+ /**
+ * Gets minimum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Minimum number of unique nodes in pipeline execution.
+ */
+ @IgniteMBeanDescription("Minimum number of unique nodes participated in pipeline execution.")
+ public int getPipelineMinimumExecutionNodes();
+
+ /**
+ * Gets average number of unique nodes participated in pipeline execution. If pipeline execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Average number of unique nodes in pipeline execution.
+ */
+ @IgniteMBeanDescription("Average number of unique nodes participated in pipeline execution.")
+ public int getPipelineAverageExecutionNodes();
+
+ /**
+ * Gets number of current active sessions. Since event execution sessions are tracked only when
+ * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+ * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+ *
+ * @return Number of current active sessions.
+ */
+ @IgniteMBeanDescription("Number of current active sessions.")
+ public int getCurrentActiveSessions();
+
+ /**
+ * Gets maximum number of active sessions since last reset. Since event execution sessions are tracked only when
+ * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+ * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+ *
+ * @return Maximum active sessions since last reset.
+ */
+ @IgniteMBeanDescription("Maximum number of active sessions since last reset.")
+ public int getMaximumActiveSessions();
+
+ /**
+ * Gets number of failures since last reset. If {@code atLeastOnce} flag is set to steamer configuration,
+ * then only root node failures will be counted. Otherwise each node will count failures independently.
+ *
+ * @return Failures count.
+ */
+ @IgniteMBeanDescription("Number of failures since last reset.")
+ public int getFailuresCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
new file mode 100644
index 0000000..eaeb2f1
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
@@ -0,0 +1,201 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import java.util.*;
+
+/**
+ * Streamer metrics.
+ */
+public interface StreamerMetrics {
+ /**
+ * Gets number of stages currently being executed in streamer pool.
+ *
+ * @return Number of stages. Cannot be more than pool thread count.
+ */
+ public int stageActiveExecutionCount();
+
+ /**
+ * Gets number of event batches currently waiting to be executed.
+ *
+ * @return Number of event batches waiting to be processed.
+ */
+ public int stageWaitingExecutionCount();
+
+ /**
+ * Gets total number of stages executed since last reset.
+ *
+ * @return Total number of stages executed since last reset.
+ */
+ public long stageTotalExecutionCount();
+
+ /**
+ * Gets pipeline maximum execution time, i.e. time between execution start and time when last stage in pipeline
+ * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+ * recorded independently.
+ *
+ * @return Pipeline maximum execution time.
+ */
+ public long pipelineMaximumExecutionTime();
+
+ /**
+ * Gets pipeline minimum execution time, i.e. time between execution start and time when last stage in pipeline
+ * returned empty map. If pipeline execution was split to different nodes, metrics for each split will be
+ * recorded independently.
+ *
+ * @return Pipeline minimum execution time.
+ */
+ public long pipelineMinimumExecutionTime();
+
+ /**
+ * Gets pipeline average execution time, i.e. time between execution start and time when last stage in pipeline
+ * returned empty map. If pipeline execution was split, metrics for each split will be recorded independently.
+ *
+ * @return Pipeline average execution time.
+ */
+ public long pipelineAverageExecutionTime();
+
+ /**
+ * Gets maximum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Maximum number of unique nodes in pipeline execution.
+ */
+ public int pipelineMaximumExecutionNodes();
+
+ /**
+ * Gets minimum number of unique nodes participated in pipeline execution. If pipeline execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Minimum number of unique nodes in pipeline execution.
+ */
+ public int pipelineMinimumExecutionNodes();
+
+ /**
+ * Gets average number of unique nodes participated in pipeline execution. If pipeline execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Average number of unique nodes in pipeline execution.
+ */
+ public int pipelineAverageExecutionNodes();
+
+ /**
+ * Gets query maximum execution time. If query execution was split to different nodes, metrics for each split
+ * will be recorded independently.
+ *
+ * @return Query maximum execution time.
+ */
+ public long queryMaximumExecutionTime();
+
+ /**
+ * Gets query minimum execution time. If query execution was split to different nodes, metrics for each split
+ * will be recorded independently.
+ *
+ * @return Query minimum execution time.
+ */
+ public long queryMinimumExecutionTime();
+
+ /**
+ * Gets query average execution time. If query execution was split to different nodes, metrics for each split
+ * will be recorded independently.
+ *
+ * @return Query average execution time.
+ */
+ public long queryAverageExecutionTime();
+
+ /**
+ * Gets maximum number of unique nodes participated in query execution. If query execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Maximum number of unique nodes in query execution.
+ */
+ public int queryMaximumExecutionNodes();
+
+ /**
+ * Gets minimum number of unique nodes participated in query execution. If query execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Minimum number of unique nodes in query execution.
+ */
+ public int queryMinimumExecutionNodes();
+
+ /**
+ * Gets average number of unique nodes participated in query execution. If query execution was split,
+ * metrics for each split will be recorded independently.
+ *
+ * @return Average number of unique nodes in query execution.
+ */
+ public int queryAverageExecutionNodes();
+
+ /**
+ * Gets number of current active sessions. Since event execution sessions are tracked only when
+ * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+ * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+ *
+ * @return Number of current active sessions.
+ */
+ public int currentActiveSessions();
+
+ /**
+ * Gets maximum number of active sessions since last reset. Since event execution sessions are tracked only when
+ * {@code atLeastOnce} configuration property is set to {@code true}, this metric will be collected
+ * only in this case. When {@code atLeastOnce} is set to {@code false}, this metric will always be zero.
+ *
+ * @return Maximum active sessions since last reset.
+ */
+ public int maximumActiveSessions();
+
+ /**
+ * Gets number of failures. If {@code atLeastOnce} flag is set to steamer configuration, then only root node
+ * failures will be counted. Otherwise each node will count failures independently.
+ *
+ * @return Failures count.
+ */
+ public int failuresCount();
+
+ /**
+ * Gets maximum number of threads in executor service.
+ *
+ * @return Maximum number of threads in executor service.
+ */
+ public int executorServiceCapacity();
+
+ /**
+ * Gets current stage metrics, if stage with given name is not configured
+ * then {@link IllegalArgumentException} will be thrown.
+ *
+ * @param stageName Stage name.
+ * @return Stage metrics.
+ */
+ public GridStreamerStageMetrics stageMetrics(String stageName);
+
+ /**
+ * Gets metrics for all stages. Stage metrics order is the same as stage order in configuration.
+ *
+ * @return Stage metrics collection.
+ */
+ public Collection<GridStreamerStageMetrics> stageMetrics();
+
+ /**
+ * Gets current window metrics, if window with given name is not configured
+ * then {@link IllegalArgumentException} will be thrown.
+ *
+ * @param winName Window name.
+ * @return Window metrics.
+ */
+ public GridStreamerWindowMetrics windowMetrics(String winName);
+
+ /**
+ * Gets metrics for all windows.
+ *
+ * @return Collection of window metrics.
+ */
+ public Collection<GridStreamerWindowMetrics> windowMetrics();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
index 97d3df1..5a71fc2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerAffinityEventRouter.java
@@ -30,7 +30,7 @@ import java.util.*;
* via {@link AffinityEvent#affinityKey()} method. If event does not implement
* {@link AffinityEvent} interface, then event itself will be used to determine affinity.
*/
-public class GridStreamerAffinityEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerAffinityEventRouter extends StreamerEventRouterAdapter {
/** */
public static final int REPLICA_CNT = 128;
@@ -56,7 +56,7 @@ public class GridStreamerAffinityEventRouter extends GridStreamerEventRouterAdap
private Collection<UUID> addedNodes = new GridConcurrentHashSet<>();
/** {@inheritDoc} */
- @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
return node(evt instanceof AffinityEvent ? ((AffinityEvent) evt).affinityKey() :
evt, ctx);
}
@@ -66,7 +66,7 @@ public class GridStreamerAffinityEventRouter extends GridStreamerEventRouterAdap
* @param ctx Context.
* @return Rich node.
*/
- private ClusterNode node(Object obj, GridStreamerContext ctx) {
+ private ClusterNode node(Object obj, StreamerContext ctx) {
while (true) {
Collection<ClusterNode> nodes = ctx.projection().nodes();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
index 99af374..c4f1d82 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerCacheAffinityEventRouter.java
@@ -24,7 +24,7 @@ import org.jetbrains.annotations.*;
* via {@link CacheAffinityEvent#affinityKey()} method. If event does not implement
* {@link CacheAffinityEvent} interface, then event will be routed always to local node.
*/
-public class GridStreamerCacheAffinityEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerCacheAffinityEventRouter extends StreamerEventRouterAdapter {
/**
* All events that implement this interface will be routed based on key affinity.
*/
@@ -46,7 +46,7 @@ public class GridStreamerCacheAffinityEventRouter extends GridStreamerEventRoute
private Ignite ignite;
/** {@inheritDoc} */
- @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
if (evt instanceof CacheAffinityEvent) {
CacheAffinityEvent e = (CacheAffinityEvent)evt;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
index b9eccd0..6f6794a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerLocalEventRouter.java
@@ -20,18 +20,18 @@ import java.util.*;
/**
* Local router. Always routes event to local node.
*/
-public class GridStreamerLocalEventRouter implements GridStreamerEventRouter {
+public class GridStreamerLocalEventRouter implements StreamerEventRouter {
/** Grid instance. */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
- @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
return ignite.cluster().localNode();
}
/** {@inheritDoc} */
- @Override public <T> Map<ClusterNode, Collection<T>> route(GridStreamerContext ctx, String stageName,
+ @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx, String stageName,
Collection<T> evts) {
return F.asMap(ignite.cluster().localNode(), evts);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
index b780142..36f3c8e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRandomEventRouter.java
@@ -21,7 +21,7 @@ import java.util.*;
/**
* Random router. Routes event to random node.
*/
-public class GridStreamerRandomEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerRandomEventRouter extends StreamerEventRouterAdapter {
/** Optional predicates to exclude nodes from routing. */
private IgnitePredicate<ClusterNode>[] predicates;
@@ -56,7 +56,7 @@ public class GridStreamerRandomEventRouter extends GridStreamerEventRouterAdapte
}
/** {@inheritDoc} */
- @Override public ClusterNode route(GridStreamerContext ctx, String stageName, Object evt) {
+ @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
Collection<ClusterNode> nodes = F.view(ctx.projection().nodes(), predicates);
if (F.isEmpty(nodes))
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
index 4597405..2de11f2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/router/GridStreamerRoundRobinEventRouter.java
@@ -18,12 +18,12 @@ import java.util.concurrent.atomic.*;
/**
* Round robin router.
*/
-public class GridStreamerRoundRobinEventRouter extends GridStreamerEventRouterAdapter {
+public class GridStreamerRoundRobinEventRouter extends StreamerEventRouterAdapter {
/** */
private final AtomicLong lastOrder = new AtomicLong();
/** {@inheritDoc} */
- @Override public ClusterNode route(GridStreamerContext ctx, String stageName, Object evt) {
+ @Override public ClusterNode route(StreamerContext ctx, String stageName, Object evt) {
Collection<ClusterNode> nodes = ctx.projection().nodes();
int idx = (int)(lastOrder.getAndIncrement() % nodes.size());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
index 0e60c33..012b6c2 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
@@ -33,7 +33,7 @@
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" parent="base.grid.cfg">
<property name="streamerConfiguration">
<list>
- <bean class="org.gridgain.grid.streamer.GridStreamerConfiguration">
+ <bean class="org.gridgain.grid.streamer.StreamerConfiguration">
<property name="windows">
<bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedSizeWindow">
<property name="maximumSize" value="500"/>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
index 9a3c859..1b7aea0 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
@@ -40,7 +40,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
private Collection<GridStreamerStage> stages;
/** Event router. */
- private GridStreamerEventRouter router;
+ private StreamerEventRouter router;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -62,8 +62,8 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
/**
* @return Streamer configuration.
*/
- private GridStreamerConfiguration streamerConfiguration() {
- GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+ private StreamerConfiguration streamerConfiguration() {
+ StreamerConfiguration cfg = new StreamerConfiguration();
cfg.setRouter(router);
@@ -90,7 +90,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) throws GridException {
assert evts.size() == 1;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
index b4f7725..da98634 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
@@ -60,8 +60,8 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
/**
* @return Streamer configuration.
*/
- private GridStreamerConfiguration streamerConfiguration() {
- GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+ private StreamerConfiguration streamerConfiguration() {
+ StreamerConfiguration cfg = new StreamerConfiguration();
cfg.setAtLeastOnce(true);
@@ -77,7 +77,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
SC pass = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> objects) {
assert ctx.nextStageName() != null;
@@ -87,7 +87,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
};
SC put = new SC() {
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) {
ConcurrentMap<Object, AtomicInteger> cntrs = ctx.localSpace();
@@ -160,7 +160,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
IgniteStreamer streamer = grid(0).streamer(null);
- streamer.addStreamerFailureListener(new GridStreamerFailureListener() {
+ streamer.addStreamerFailureListener(new StreamerFailureListener() {
@Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
info("Unable to failover events [stageName=" + stageName + ", err=" + err + ']');
@@ -206,7 +206,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
/**
* Test random router.
*/
- private static class TestRandomRouter extends GridStreamerEventRouterAdapter {
+ private static class TestRandomRouter extends StreamerEventRouterAdapter {
/** Source node ID. */
private UUID srcNodeId;
@@ -214,7 +214,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
private UUID destNodeId;
/** {@inheritDoc} */
- @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
if ("put".equals(stageName))
return ctx.projection().node(destNodeId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
index 083af44..8b7d2b3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
@@ -20,12 +20,12 @@ import org.jetbrains.annotations.*;
import java.util.*;
/**
- * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link GridStreamerConfiguration}.
+ * Test for {@link org.apache.ignite.lifecycle.LifecycleAware} support in {@link org.gridgain.grid.streamer.StreamerConfiguration}.
*/
public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwareSelfTest {
/**
*/
- private static class TestEventRouter extends TestLifecycleAware implements GridStreamerEventRouter {
+ private static class TestEventRouter extends TestLifecycleAware implements StreamerEventRouter {
/**
*/
TestEventRouter() {
@@ -33,12 +33,12 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
}
/** {@inheritDoc} */
- @Nullable @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+ @Nullable @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
return null;
}
/** {@inheritDoc} */
- @Nullable @Override public <T> Map<ClusterNode, Collection<T>> route(GridStreamerContext ctx,
+ @Nullable @Override public <T> Map<ClusterNode, Collection<T>> route(StreamerContext ctx,
String stageName, Collection<T> evts) {
return null;
}
@@ -59,7 +59,7 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
}
/** {@inheritDoc} */
- @Nullable @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection evts) {
+ @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
return null;
}
}
@@ -177,7 +177,7 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
@Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- GridStreamerConfiguration streamerCfg = new GridStreamerConfiguration();
+ StreamerConfiguration streamerCfg = new StreamerConfiguration();
TestEventRouter router = new TestEventRouter();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
index 51986a9..7eb843f 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
@@ -51,7 +51,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
private Collection<GridStreamerStage> stages;
/** Event router. */
- private GridStreamerEventRouter router;
+ private StreamerEventRouter router;
/** P2P enabled flag. */
private boolean p2pEnabled;
@@ -81,8 +81,8 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
/**
* @return Streamer configuration.
*/
- private GridStreamerConfiguration streamerConfiguration() {
- GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+ private StreamerConfiguration streamerConfiguration() {
+ StreamerConfiguration cfg = new StreamerConfiguration();
cfg.setAtLeastOnce(atLeastOnce);
@@ -114,7 +114,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
return "name";
}
- @Nullable @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection evts) {
+ @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
assert g != null;
assert log != null;
@@ -155,7 +155,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts)
throws GridException {
String nextStage = ctx.nextStageName();
@@ -214,18 +214,18 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
// Wait until all acks are received.
GridTestUtils.retryAssert(log, 100, 50, new CA() {
@Override public void apply() {
- GridStreamerMetrics metrics = ignite0.streamer(null).metrics();
+ StreamerMetrics metrics = ignite0.streamer(null).metrics();
assertEquals(0, metrics.currentActiveSessions());
}
});
- GridStreamerMetrics metrics = ignite0.streamer(null).metrics();
+ StreamerMetrics metrics = ignite0.streamer(null).metrics();
assertTrue(metrics.maximumActiveSessions() > 0);
- ignite0.streamer(null).context().query(new IgniteClosure<GridStreamerContext, Object>() {
- @Override public Object apply(GridStreamerContext ctx) {
+ ignite0.streamer(null).context().query(new IgniteClosure<StreamerContext, Object>() {
+ @Override public Object apply(StreamerContext ctx) {
try {
U.sleep(1000);
}
@@ -265,7 +265,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) {
String nextStage = ctx.nextStageName();
@@ -325,7 +325,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) {
String nextStage = ctx.nextStageName();
@@ -385,7 +385,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) {
String nextStage = ctx.nextStageName();
@@ -411,7 +411,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
final CountDownLatch errLatch = new CountDownLatch(1);
- grid(0).streamer(null).addStreamerFailureListener(new GridStreamerFailureListener() {
+ grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() {
@Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
info("Expected failure: " + err.getMessage());
@@ -449,7 +449,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) throws GridException {
String nextStage = ctx.nextStageName();
@@ -530,7 +530,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) {
ConcurrentMap<String, AtomicInteger> space = ctx.localSpace();
@@ -605,12 +605,12 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
for (String s : stages)
assertEquals((Integer)sum, stagesSum.get(s));
- GridStreamerContext streamerCtx = grid(0).streamer(null).context();
+ StreamerContext streamerCtx = grid(0).streamer(null).context();
// Check query.
for (final String s : stages) {
- Collection<Integer> res = streamerCtx.query(new C1<GridStreamerContext, Integer>() {
- @Override public Integer apply(GridStreamerContext ctx) {
+ Collection<Integer> res = streamerCtx.query(new C1<StreamerContext, Integer>() {
+ @Override public Integer apply(StreamerContext ctx) {
AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s);
return cntr.get();
@@ -621,8 +621,8 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
// Check broadcast.
- streamerCtx.broadcast(new CI1<GridStreamerContext>() {
- @Override public void apply(GridStreamerContext ctx) {
+ streamerCtx.broadcast(new CI1<StreamerContext>() {
+ @Override public void apply(StreamerContext ctx) {
int sum = 0;
ConcurrentMap<String, AtomicInteger> space = ctx.localSpace();
@@ -652,8 +652,8 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
// Check reduce.
for (final String s : stages) {
Integer res = streamerCtx.reduce(
- new C1<GridStreamerContext, Integer>() {
- @Override public Integer apply(GridStreamerContext ctx) {
+ new C1<StreamerContext, Integer>() {
+ @Override public Integer apply(StreamerContext ctx) {
AtomicInteger cntr = ctx.<String, AtomicInteger>localSpace().get(s);
return cntr.get();
@@ -683,7 +683,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
SC stage = new SC() {
@SuppressWarnings("unchecked")
- @Override public Map<String, Collection<?>> applyx(String stageName, GridStreamerContext ctx,
+ @Override public Map<String, Collection<?>> applyx(String stageName, StreamerContext ctx,
Collection<Object> evts) {
return ctx.nextStageName() == null ? null : (Map)F.asMap(ctx.nextStageName(), F.asList(0));
}
@@ -699,7 +699,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
final CountDownLatch errLatch = new CountDownLatch(errCnt);
- grid(0).streamer(null).addStreamerFailureListener(new GridStreamerFailureListener() {
+ grid(0).streamer(null).addStreamerFailureListener(new StreamerFailureListener() {
@Override public void onFailure(String stageName, Collection<Object> evts, Throwable err) {
info("Expected failure: " + err.getMessage());
@@ -726,7 +726,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
private void checkMetrics(Ignite ignite, String stage, int evtCnt, boolean pipeline) {
IgniteStreamer streamer = ignite.streamer(null);
- GridStreamerMetrics metrics = streamer.metrics();
+ StreamerMetrics metrics = streamer.metrics();
assertEquals(evtCnt, metrics.stageTotalExecutionCount());
assertEquals(0, metrics.stageWaitingExecutionCount());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
index a8585b3..5e6f4f7 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
@@ -39,7 +39,7 @@ class GridTestStage implements GridStreamerStage<Object> {
}
/** {@inheritDoc} */
- @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection<Object> evts)
+ @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Object> evts)
throws GridException {
return stageClos.apply(name(), ctx, evts);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
index 99e7887..fd9b642 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStreamerEventRouter.java
@@ -17,7 +17,7 @@ import java.util.*;
/**
* Test router.
*/
-class GridTestStreamerEventRouter extends GridStreamerEventRouterAdapter {
+class GridTestStreamerEventRouter extends StreamerEventRouterAdapter {
/** Route table. */
private Map<String, UUID> routeTbl = new HashMap<>();
@@ -30,7 +30,7 @@ class GridTestStreamerEventRouter extends GridStreamerEventRouterAdapter {
}
/** {@inheritDoc} */
- @Override public <T> ClusterNode route(GridStreamerContext ctx, String stageName, T evt) {
+ @Override public <T> ClusterNode route(StreamerContext ctx, String stageName, T evt) {
UUID nodeId = routeTbl.get(stageName);
if (nodeId == null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
index 1e0e079..ab33222 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/SC.java
@@ -18,6 +18,6 @@ import java.util.*;
* Typedef for generic closure.
*/
abstract class SC
- extends GridClosure3X<String, GridStreamerContext, Collection<Object>, Map<String, Collection<?>>> {
+ extends GridClosure3X<String, StreamerContext, Collection<Object>, Map<String, Collection<?>>> {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
index 92aefe0..812ddb7 100644
--- a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
@@ -112,18 +112,18 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest
/**
* @return Streamer configuration.
*/
- private static GridStreamerConfiguration streamerConfiguration() {
+ private static StreamerConfiguration streamerConfiguration() {
Collection<GridStreamerStage> stages = F.<GridStreamerStage>asList(new GridStreamerStage() {
@Override public String name() {
return "name";
}
- @Nullable @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection evts) {
+ @Nullable @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection evts) {
return null;
}
});
- GridStreamerConfiguration cfg = new GridStreamerConfiguration();
+ StreamerConfiguration cfg = new StreamerConfiguration();
cfg.setAtLeastOnce(true);
cfg.setWindows(F.asList((GridStreamerWindow)new GridStreamerUnboundedWindow()));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fe37fb/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java b/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
index be07734..e6f75ac 100644
--- a/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
+++ b/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
@@ -26,7 +26,7 @@ class TestStage implements GridStreamerStage<Integer> {
}
/** {@inheritDoc} */
- @Override public Map<String, Collection<?>> run(GridStreamerContext ctx, Collection<Integer> evts)
+ @Override public Map<String, Collection<?>> run(StreamerContext ctx, Collection<Integer> evts)
throws GridException {
ConcurrentMap<String, TestAverage> loc = ctx.localSpace();