You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:11 UTC
[10/32] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java
deleted file mode 100644
index 461d618..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindow.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.index.*;
-import org.gridgain.grid.streamer.window.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer rolling window. Rolling windows allow new event to come in, as well as automatically
- * evicting obsolete events on the other side. Windows allow to query into specific time-frame
- * or specific sample size of the events. With windows, you can answer questions like "What
- * are my top 10 best selling products over last 24 hours?", or "Who are my top 10 users out of
- * last 1,000,000 users who logged in?"
- * <p>
- * GridGain comes with following rolling windows implementations out of the box:
- * <ul>
- * <li>{@link GridStreamerUnboundedWindow}</li>
- * <li>{@link GridStreamerBoundedSizeWindow}</li>
- * <li>{@link GridStreamerBoundedSizeBatchWindow}</li>
- * <li>{@link GridStreamerBoundedSizeSortedWindow}</li>
- * <li>{@link GridStreamerBoundedTimeWindow}</li>
- * <li>{@link GridStreamerBoundedTimeBatchWindow}</li>
- * </ul>
- * <p>
- * Streamer window is configured via {@link StreamerConfiguration#getWindows()} method.
- */
-public interface GridStreamerWindow<E> extends Iterable<E> {
- /**
- * Gets window name.
- *
- * @return Window name.
- */
- public String name();
-
- /**
- * Gets default index, if default index is not configured then
- * {@link IllegalArgumentException} will be thrown.
- *
- * @param <K> Type of the index key.
- * @param <V> Type of the index value.
- * @return Index with default name.
- */
- public <K, V> GridStreamerIndex<E, K, V> index();
-
- /**
- * Gets index by name, if not index with such name was configured then
- * {@link IllegalArgumentException} will be thrown.
- *
- * @param name Name of the index, if {@code null} then analogous to {@link #index()}.
- * @param <K> Type of the index key.
- * @param <V> Type of the index value.
- * @return Index with a given name.
- */
- public <K, V> GridStreamerIndex<E, K, V> index(@Nullable String name);
-
- /**
- * Gets all indexes configured for this window.
- *
- * @return All indexes configured for this window or empty collection, if no
- * indexes were configured.
- */
- public Collection<GridStreamerIndex<E, ?, ?>> indexes();
-
- /**
- * Resets window. Usually will clear all events from window.
- */
- public void reset();
-
- /**
- * Gets number of events currently stored in window.
- *
- * @return Current size of the window.
- */
- public int size();
-
- /**
- * Gets number of entries available for eviction.
- *
- * @return Number of entries available for eviction.
- */
- public int evictionQueueSize();
-
- /**
- * Adds single event to window.
- *
- * @param evt Event to add.
- * @return {@code True} if event was added.
- * @throws GridException If index update failed.
- */
- public boolean enqueue(E evt) throws GridException;
-
- /**
- * Adds events to window.
- *
- * @param evts Events to add.
- * @return {@code}
- * @throws GridException If index update failed.
- */
- public boolean enqueue(E... evts) throws GridException;
-
- /**
- * Adds all events to window.
- *
- * @param evts Collection of events to add.
- * @return {@code True} if all events were added, {@code false} if at
- * least 1 event was skipped.
- * @throws GridException If index update failed.
- */
- public boolean enqueueAll(Collection<E> evts) throws GridException;
-
- /**
- * Dequeues last element from windows. Will return {@code null} if window is empty.
- *
- * @return Dequeued element.
- * @throws GridException If index update failed.
- */
- @Nullable public E dequeue() throws GridException;
-
- /**
- * Dequeues up to {@code cnt} elements from window. If current window size is less than {@code cnt},
- * will dequeue all elements from window.
- *
- * @param cnt Count to dequeue.
- * @return Collection of dequeued elements.
- * @throws GridException If index update failed.
- */
- public Collection<E> dequeue(int cnt) throws GridException;
-
- /**
- * Dequeues all elements from window.
- *
- * @return Collection of dequeued elements.
- * @throws GridException If index update failed.
- */
- public Collection<E> dequeueAll() throws GridException;
-
- /**
- * If window supports eviction, this method will return next evicted element.
- *
- * @return Polls and returns next evicted event or {@code null} if eviction queue is empty or if
- * window does not support eviction.
- * @throws GridException If index update failed.
- */
- @Nullable public E pollEvicted() throws GridException;
-
- /**
- * If window supports eviction, this method will return up to {@code cnt} evicted elements.
- *
- * @param cnt Number of elements to evict.
- * @return Collection of evicted elements.
- * @throws GridException If index update failed.
- */
- public Collection<E> pollEvicted(int cnt) throws GridException;
-
- /**
- * If window supports batch eviction, this method will poll next evicted batch from window.
- * If windows does not support batch eviction but supports eviction, will return collection of single
- * last evicted element.
- * If window does not support eviction, will return empty collection.
- *
- * @return Next evicted batch.
- * @throws GridException If index update failed.
- */
- public Collection<E> pollEvictedBatch() throws GridException;
-
- /**
- * If window supports eviction, this method will return all available evicted elements.
- *
- * @return Collection of evicted elements.
- * @throws GridException If index update failed.
- */
- public Collection<E> pollEvictedAll() throws GridException;
-
- /**
- * Clears all evicted entries.
- *
- * @throws GridException If index update failed.
- */
- public void clearEvicted() throws GridException;
-
- /**
- * Create window snapshot. Evicted entries are not included.
- *
- * @param includeIvicted Whether to include evicted entries or not.
- * @return Window snapshot.
- */
- public Collection<E> snapshot(boolean includeIvicted);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMBean.java
deleted file mode 100644
index 8cd999d..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMBean.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-import org.apache.ignite.mbean.*;
-
-/**
- * Streamer window MBean.
- */
-@IgniteMBeanDescription("MBean that provides access to streamer window description.")
-public interface GridStreamerWindowMBean {
- /**
- * Gets window name.
- *
- * @return Window name.
- */
- @IgniteMBeanDescription("Window name.")
- public String getName();
-
- /**
- * Gets window class name.
- *
- * @return Window class name.
- */
- @IgniteMBeanDescription("Window class name.")
- public String getClassName();
-
- /**
- * Gets current window size.
- *
- * @return Current window size.
- */
- @IgniteMBeanDescription("Window size.")
- public int getSize();
-
- /**
- * Gets estimate for window eviction queue size.
- *
- * @return Eviction queue size estimate.
- */
- @IgniteMBeanDescription("Eviction queue size estimate.")
- public int getEvictionQueueSize();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMetrics.java
deleted file mode 100644
index 74fe333..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/GridStreamerWindowMetrics.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer;
-
-/**
- * Streamer window metrics.
- */
-public interface GridStreamerWindowMetrics {
- /**
- * Gets window name.
- *
- * @return Window name.
- */
- public String name();
-
- /**
- * Gets window size.
- *
- * @return Window size.
- */
- public int size();
-
- /**
- * Gets eviction queue size.
- *
- * @return Eviction queue size.
- */
- public int evictionQueueSize();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java
index 2c7fe45..9576a01 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerConfiguration.java
@@ -30,14 +30,14 @@ public class StreamerConfiguration {
private String name;
/** Window. */
- private Collection<GridStreamerWindow> win;
+ private Collection<StreamerWindow> win;
/** Router. */
private StreamerEventRouter router;
/** Stages. */
@GridToStringInclude
- private Collection<GridStreamerStage> stages;
+ private Collection<StreamerStage> stages;
/** At least once flag. */
private boolean atLeastOnce;
@@ -119,7 +119,7 @@ public class StreamerConfiguration {
*
* @return Streamer windows.
*/
- public Collection<GridStreamerWindow> getWindows() {
+ public Collection<StreamerWindow> getWindows() {
return win;
}
@@ -128,7 +128,7 @@ public class StreamerConfiguration {
*
* @param win Window.
*/
- public void setWindows(Collection<GridStreamerWindow> win) {
+ public void setWindows(Collection<StreamerWindow> win) {
this.win = win;
}
@@ -138,7 +138,7 @@ public class StreamerConfiguration {
*
* @return Collection of streamer stages.
*/
- public Collection<GridStreamerStage> getStages() {
+ public Collection<StreamerStage> getStages() {
return stages;
}
@@ -147,7 +147,7 @@ public class StreamerConfiguration {
*
* @param stages Stages.
*/
- public void setStages(Collection<GridStreamerStage> stages) {
+ public void setStages(Collection<StreamerStage> stages) {
this.stages = stages;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java
index f8828a3..d6f720c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerContext.java
@@ -40,7 +40,7 @@ public interface StreamerContext {
*
* @return Default window.
*/
- public <E> GridStreamerWindow<E> window();
+ public <E> StreamerWindow<E> window();
/**
* Gets streamer event window by window name, if no window with such
@@ -49,10 +49,10 @@ public interface StreamerContext {
* @param winName Window name.
* @return Window instance.
*/
- public <E> GridStreamerWindow<E> window(String winName);
+ public <E> StreamerWindow<E> window(String winName);
/**
- * For context passed to {@link GridStreamerStage#run(StreamerContext, Collection)} this method will
+ * For context passed to {@link StreamerStage#run(StreamerContext, Collection)} this method will
* return next stage name in execution pipeline. For context obtained from streamer object, this method will
* return first stage name.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
index eaeb2f1..4d47491 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerMetrics.java
@@ -174,14 +174,14 @@ public interface StreamerMetrics {
* @param stageName Stage name.
* @return Stage metrics.
*/
- public GridStreamerStageMetrics stageMetrics(String stageName);
+ public StreamerStageMetrics stageMetrics(String stageName);
/**
* Gets metrics for all stages. Stage metrics order is the same as stage order in configuration.
*
* @return Stage metrics collection.
*/
- public Collection<GridStreamerStageMetrics> stageMetrics();
+ public Collection<StreamerStageMetrics> stageMetrics();
/**
* Gets current window metrics, if window with given name is not configured
@@ -190,12 +190,12 @@ public interface StreamerMetrics {
* @param winName Window name.
* @return Window metrics.
*/
- public GridStreamerWindowMetrics windowMetrics(String winName);
+ public StreamerWindowMetrics windowMetrics(String winName);
/**
* Gets metrics for all windows.
*
* @return Collection of window metrics.
*/
- public Collection<GridStreamerWindowMetrics> windowMetrics();
+ public Collection<StreamerWindowMetrics> windowMetrics();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStage.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStage.java
new file mode 100644
index 0000000..fc01059
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStage.java
@@ -0,0 +1,53 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.gridgain.grid.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Streamer stage is a component that determines event processing flow. User logic related to
+ * any particular event processing is implemented by streamer stage. A stage takes events as
+ * an input and returns groups of events mapped to different stages as an output. Events for
+ * every returned stage will be passed to {@link StreamerEventRouter} which will determine
+ * on which node the stage should be executed.
+ * <p>
+ * Generally, event stage execution graph if fully controlled by return values of
+ * this method, while node execution graph is controlled by
+ * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
+ */
+public interface StreamerStage<IN> {
+ /**
+ * Gets streamer stage name.
+ *
+ * @return Name of the stage.
+ */
+ public String name();
+
+ /**
+ * Stage execution routine. After the passed in events are processed, stage can emit
+ * another set of events to be processed. The returned events can be mapped to different
+ * stages. Events for every returned stage will be passed to {@link StreamerEventRouter}
+ * which will determine on which node the stage should be executed.
+ * <p>
+ * Generally, event stage execution graph if fully controlled by return values of
+ * this method, while node execution graph is controlled by
+ * {@link StreamerEventRouter#route(StreamerContext, String, Object)} method.
+ *
+ * @param ctx Streamer context.
+ * @param evts Input events.
+ * @return Map of stage name to collection of events.
+ * @throws GridException If failed.
+ */
+ @Nullable public Map<String, Collection<?>> run(StreamerContext ctx, Collection<IN> evts)
+ throws GridException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMBean.java
new file mode 100644
index 0000000..e2be4c2
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMBean.java
@@ -0,0 +1,106 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.apache.ignite.mbean.*;
+
+/**
+ * Streamer stage MBean.
+ */
+@IgniteMBeanDescription("MBean that provides access to streamer stage description and metrics.")
+public interface StreamerStageMBean {
+ /**
+ * Gets stage name.
+ *
+ * @return Stage name.
+ */
+ @IgniteMBeanDescription("Stage name.")
+ public String getName();
+
+ /**
+ * Gets stage class name.
+ *
+ * @return Stage class name.
+ */
+ @IgniteMBeanDescription("Stage class name.")
+ public String getStageClassName();
+
+ /**
+ * Gets stage minimum execution time.
+ *
+ * @return Stage minimum execution time.
+ */
+ @IgniteMBeanDescription("Stage minimum execution time.")
+ public long getMinimumExecutionTime();
+
+ /**
+ * Gets stage maximum execution time.
+ *
+ * @return Stage maximum execution time.
+ */
+ @IgniteMBeanDescription("Stage maximum execution time.")
+ public long getMaximumExecutionTime();
+
+ /**
+ * Gets stage average execution time.
+ *
+ * @return Stage average execution time.
+ */
+ @IgniteMBeanDescription("Stage average execution time.")
+ public long getAverageExecutionTime();
+
+ /**
+ * Gets stage minimum waiting time.
+ *
+ * @return Stage minimum waiting time.
+ */
+ @IgniteMBeanDescription("Stage minimum waiting time.")
+ public long getMinimumWaitingTime();
+
+ /**
+ * Gets stage maximum waiting time.
+ *
+ * @return Stage maximum waiting time.
+ */
+ @IgniteMBeanDescription("Stage maximum waiting time.")
+ public long getMaximumWaitingTime();
+
+ /**
+ * Stage average waiting time.
+ *
+ * @return Stage average waiting time.
+ */
+ @IgniteMBeanDescription("Stage average waiting time.")
+ public long getAverageWaitingTime();
+
+ /**
+ * Gets total stage execution count since last reset.
+ *
+ * @return Number of times this stage was executed.
+ */
+ @IgniteMBeanDescription("Number of times this stage was executed.")
+ public long getTotalExecutionCount();
+
+ /**
+ * Gets stage failure count.
+ *
+ * @return Stage failure count.
+ */
+ @IgniteMBeanDescription("Stage failure count.")
+ public int getFailuresCount();
+
+ /**
+ * Gets flag indicating if stage is being currently executed by at least one thread on current node.
+ *
+ * @return {@code True} if stage is executing now.
+ */
+ @IgniteMBeanDescription("Whether stage is currently being executed.")
+ public boolean isExecuting();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMetrics.java
new file mode 100644
index 0000000..0cbc3c8
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerStageMetrics.java
@@ -0,0 +1,85 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+/**
+ * Streamer stage metrics.
+ */
+public interface StreamerStageMetrics {
+ /**
+ * Gets stage name.
+ *
+ * @return Stage name.
+ */
+ public String name();
+
+ /**
+ * Gets stage minimum execution time.
+ *
+ * @return Stage minimum execution time.
+ */
+ public long minimumExecutionTime();
+
+ /**
+ * Gets stage maximum execution time.
+ *
+ * @return Stage maximum execution time.
+ */
+ public long maximumExecutionTime();
+
+ /**
+ * Gets stage average execution time.
+ *
+ * @return Stage average execution time.
+ */
+ public long averageExecutionTime();
+
+ /**
+ * Gets stage minimum waiting time.
+ *
+ * @return Stage minimum waiting time.
+ */
+ public long minimumWaitingTime();
+
+ /**
+ * Gets stage maximum waiting time.
+ *
+ * @return Stage maximum waiting time.
+ */
+ public long maximumWaitingTime();
+
+ /**
+ * Stage average waiting time.
+ *
+ * @return Stage average waiting time.
+ */
+ public long averageWaitingTime();
+
+ /**
+ * Gets total stage execution count since last reset.
+ *
+ * @return Number of times this stage was executed.
+ */
+ public long totalExecutionCount();
+
+ /**
+ * Gets stage failure count.
+ *
+ * @return Stage failure count.
+ */
+ public int failuresCount();
+
+ /**
+ * Gets flag indicating if stage is being currently executed by at least one thread on current node.
+ *
+ * @return {@code True} if stage is executing now.
+ */
+ public boolean executing();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
new file mode 100644
index 0000000..4f2b462
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
@@ -0,0 +1,199 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.streamer.index.*;
+import org.gridgain.grid.streamer.window.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Streamer rolling window. Rolling windows allow new event to come in, as well as automatically
+ * evicting obsolete events on the other side. Windows allow to query into specific time-frame
+ * or specific sample size of the events. With windows, you can answer questions like "What
+ * are my top 10 best selling products over last 24 hours?", or "Who are my top 10 users out of
+ * last 1,000,000 users who logged in?"
+ * <p>
+ * GridGain comes with following rolling windows implementations out of the box:
+ * <ul>
+ * <li>{@link GridStreamerUnboundedWindow}</li>
+ * <li>{@link GridStreamerBoundedSizeWindow}</li>
+ * <li>{@link GridStreamerBoundedSizeBatchWindow}</li>
+ * <li>{@link GridStreamerBoundedSizeSortedWindow}</li>
+ * <li>{@link GridStreamerBoundedTimeWindow}</li>
+ * <li>{@link GridStreamerBoundedTimeBatchWindow}</li>
+ * </ul>
+ * <p>
+ * Streamer window is configured via {@link StreamerConfiguration#getWindows()} method.
+ */
+public interface StreamerWindow<E> extends Iterable<E> {
+ /**
+ * Gets window name.
+ *
+ * @return Window name.
+ */
+ public String name();
+
+ /**
+ * Gets default index, if default index is not configured then
+ * {@link IllegalArgumentException} will be thrown.
+ *
+ * @param <K> Type of the index key.
+ * @param <V> Type of the index value.
+ * @return Index with default name.
+ */
+ public <K, V> GridStreamerIndex<E, K, V> index();
+
+ /**
+ * Gets index by name, if not index with such name was configured then
+ * {@link IllegalArgumentException} will be thrown.
+ *
+ * @param name Name of the index, if {@code null} then analogous to {@link #index()}.
+ * @param <K> Type of the index key.
+ * @param <V> Type of the index value.
+ * @return Index with a given name.
+ */
+ public <K, V> GridStreamerIndex<E, K, V> index(@Nullable String name);
+
+ /**
+ * Gets all indexes configured for this window.
+ *
+ * @return All indexes configured for this window or empty collection, if no
+ * indexes were configured.
+ */
+ public Collection<GridStreamerIndex<E, ?, ?>> indexes();
+
+ /**
+ * Resets window. Usually will clear all events from window.
+ */
+ public void reset();
+
+ /**
+ * Gets number of events currently stored in window.
+ *
+ * @return Current size of the window.
+ */
+ public int size();
+
+ /**
+ * Gets number of entries available for eviction.
+ *
+ * @return Number of entries available for eviction.
+ */
+ public int evictionQueueSize();
+
+ /**
+ * Adds single event to window.
+ *
+ * @param evt Event to add.
+ * @return {@code True} if event was added.
+ * @throws GridException If index update failed.
+ */
+ public boolean enqueue(E evt) throws GridException;
+
+ /**
+ * Adds events to window.
+ *
+ * @param evts Events to add.
+ * @return {@code}
+ * @throws GridException If index update failed.
+ */
+ public boolean enqueue(E... evts) throws GridException;
+
+ /**
+ * Adds all events to window.
+ *
+ * @param evts Collection of events to add.
+ * @return {@code True} if all events were added, {@code false} if at
+ * least 1 event was skipped.
+ * @throws GridException If index update failed.
+ */
+ public boolean enqueueAll(Collection<E> evts) throws GridException;
+
+ /**
+ * Dequeues last element from windows. Will return {@code null} if window is empty.
+ *
+ * @return Dequeued element.
+ * @throws GridException If index update failed.
+ */
+ @Nullable public E dequeue() throws GridException;
+
+ /**
+ * Dequeues up to {@code cnt} elements from window. If current window size is less than {@code cnt},
+ * will dequeue all elements from window.
+ *
+ * @param cnt Count to dequeue.
+ * @return Collection of dequeued elements.
+ * @throws GridException If index update failed.
+ */
+ public Collection<E> dequeue(int cnt) throws GridException;
+
+ /**
+ * Dequeues all elements from window.
+ *
+ * @return Collection of dequeued elements.
+ * @throws GridException If index update failed.
+ */
+ public Collection<E> dequeueAll() throws GridException;
+
+ /**
+ * If window supports eviction, this method will return next evicted element.
+ *
+ * @return Polls and returns next evicted event or {@code null} if eviction queue is empty or if
+ * window does not support eviction.
+ * @throws GridException If index update failed.
+ */
+ @Nullable public E pollEvicted() throws GridException;
+
+ /**
+ * If window supports eviction, this method will return up to {@code cnt} evicted elements.
+ *
+ * @param cnt Number of elements to evict.
+ * @return Collection of evicted elements.
+ * @throws GridException If index update failed.
+ */
+ public Collection<E> pollEvicted(int cnt) throws GridException;
+
+ /**
+ * If window supports batch eviction, this method will poll next evicted batch from window.
+ * If windows does not support batch eviction but supports eviction, will return collection of single
+ * last evicted element.
+ * If window does not support eviction, will return empty collection.
+ *
+ * @return Next evicted batch.
+ * @throws GridException If index update failed.
+ */
+ public Collection<E> pollEvictedBatch() throws GridException;
+
+ /**
+ * If window supports eviction, this method will return all available evicted elements.
+ *
+ * @return Collection of evicted elements.
+ * @throws GridException If index update failed.
+ */
+ public Collection<E> pollEvictedAll() throws GridException;
+
+ /**
+ * Clears all evicted entries.
+ *
+ * @throws GridException If index update failed.
+ */
+ public void clearEvicted() throws GridException;
+
+ /**
+ * Create window snapshot. Evicted entries are not included.
+ *
+ * @param includeIvicted Whether to include evicted entries or not.
+ * @return Window snapshot.
+ */
+ public Collection<E> snapshot(boolean includeIvicted);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMBean.java
new file mode 100644
index 0000000..16ff58e
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMBean.java
@@ -0,0 +1,50 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+import org.apache.ignite.mbean.*;
+
+/**
+ * Streamer window MBean.
+ */
+@IgniteMBeanDescription("MBean that provides access to streamer window description.")
+public interface StreamerWindowMBean {
+ /**
+ * Gets window name.
+ *
+ * @return Window name.
+ */
+ @IgniteMBeanDescription("Window name.")
+ public String getName();
+
+ /**
+ * Gets window class name.
+ *
+ * @return Window class name.
+ */
+ @IgniteMBeanDescription("Window class name.")
+ public String getClassName();
+
+ /**
+ * Gets current window size.
+ *
+ * @return Current window size.
+ */
+ @IgniteMBeanDescription("Window size.")
+ public int getSize();
+
+ /**
+ * Gets estimate for window eviction queue size.
+ *
+ * @return Eviction queue size estimate.
+ */
+ @IgniteMBeanDescription("Eviction queue size estimate.")
+ public int getEvictionQueueSize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMetrics.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMetrics.java
new file mode 100644
index 0000000..e373866
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindowMetrics.java
@@ -0,0 +1,36 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer;
+
+/**
+ * Streamer window metrics.
+ */
+public interface StreamerWindowMetrics {
+ /**
+ * Gets window name.
+ *
+ * @return Window name.
+ */
+ public String name();
+
+ /**
+ * Gets window size.
+ *
+ * @return Window size.
+ */
+ public int size();
+
+ /**
+ * Gets eviction queue size.
+ *
+ * @return Eviction queue size.
+ */
+ public int evictionQueueSize();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
index c5dcd5f..618d800 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
@@ -9,7 +9,6 @@
package org.gridgain.grid.streamer.index;
-import org.gridgain.grid.streamer.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -17,11 +16,11 @@ import java.util.*;
/**
* User view on streamer index. Streamer indexes are used for fast look ups into streamer windows.
* <p>
- * Streamer index can be accessed from {@link GridStreamerWindow} via any of the following methods:
+ * Streamer index can be accessed from {@link org.gridgain.grid.streamer.StreamerWindow} via any of the following methods:
* <ul>
- * <li>{@link GridStreamerWindow#index()}</li>
- * <li>{@link GridStreamerWindow#index(String)}</li>
- * <li>{@link GridStreamerWindow#indexes()}</li>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index()}</li>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index(String)}</li>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#indexes()}</li>
* </ul>
* <p>
* Indexes are created and provided for streamer windows by {@link GridStreamerIndexProvider} which is
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
index dbd6c0d..43030fc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
@@ -10,11 +10,10 @@
package org.gridgain.grid.streamer.index;
import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.*;
import org.gridgain.grid.streamer.window.*;
/**
- * Represents an actual instance of an index. Used by a {@link GridStreamerWindow}
+ * Represents an actual instance of an index. Used by a {@link org.gridgain.grid.streamer.StreamerWindow}
* to perform event indexing.
* <p>
* To configure index for a streamer window, use
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerWindowAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerWindowAdapter.java
index 0d15900..e74d6c8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerWindowAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerWindowAdapter.java
@@ -25,8 +25,8 @@ import java.util.*;
/**
* Streamer window adapter.
*/
-public abstract class GridStreamerWindowAdapter<E> implements LifecycleAware, GridStreamerWindow<E>,
- GridStreamerWindowMBean {
+public abstract class GridStreamerWindowAdapter<E> implements LifecycleAware, StreamerWindow<E>,
+ StreamerWindowMBean {
/** Default window name. */
private String name = getClass().getSimpleName();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
index 1b7aea0..46e399a 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
@@ -37,7 +37,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
private static final int EVENTS_COUNT = 10;
/** Test stages. */
- private Collection<GridStreamerStage> stages;
+ private Collection<StreamerStage> stages;
/** Event router. */
private StreamerEventRouter router;
@@ -72,7 +72,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
window.setName("window1");
window.setTimeInterval(60000);
- cfg.setWindows(F.asList((GridStreamerWindow)window));
+ cfg.setWindows(F.asList((StreamerWindow)window));
cfg.setStages(stages);
@@ -100,7 +100,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
return null;
}
- GridStreamerWindow win = ctx.window("window1");
+ StreamerWindow win = ctx.window("window1");
// Add new events to the window.
win.enqueueAll(evts);
@@ -128,7 +128,7 @@ public class GridStreamerEvictionSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
+ stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
startGrids(2);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
index da98634..49c321c 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
@@ -71,7 +71,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
window.setMaximumSize(100);
- cfg.setWindows(F.asList((GridStreamerWindow)window));
+ cfg.setWindows(F.asList((StreamerWindow)window));
cfg.setMaximumConcurrentSessions(maxConcurrentSess);
@@ -104,7 +104,7 @@ public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
}
};
- cfg.setStages(F.asList((GridStreamerStage)new GridTestStage("pass", pass), new GridTestStage("put", put)));
+ cfg.setStages(F.asList((StreamerStage)new GridTestStage("pass", pass), new GridTestStage("put", put)));
return cfg;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
index 8b7d2b3..a5ffff6 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerLifecycleAwareSelfTest.java
@@ -46,7 +46,7 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
/**
*/
- private static class TestStage extends TestLifecycleAware implements GridStreamerStage {
+ private static class TestStage extends TestLifecycleAware implements StreamerStage {
/**
*/
TestStage() {
@@ -66,7 +66,7 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
/**
*/
- private static class TestWindow extends TestLifecycleAware implements GridStreamerWindow {
+ private static class TestWindow extends TestLifecycleAware implements StreamerWindow {
/**
*/
TestWindow() {
@@ -187,13 +187,13 @@ public class GridStreamerLifecycleAwareSelfTest extends GridAbstractLifecycleAwa
TestStage stage = new TestStage();
- streamerCfg.setStages(F.asList((GridStreamerStage)stage));
+ streamerCfg.setStages(F.asList((StreamerStage)stage));
lifecycleAwares.add(stage);
TestWindow window = new TestWindow();
- streamerCfg.setWindows(F.asList((GridStreamerWindow)window));
+ streamerCfg.setWindows(F.asList((StreamerWindow)window));
lifecycleAwares.add(window);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
index 7eb843f..6bdef1d 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
@@ -48,7 +48,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
private boolean atLeastOnce = true;
/** Test stages. */
- private Collection<GridStreamerStage> stages;
+ private Collection<StreamerStage> stages;
/** Event router. */
private StreamerEventRouter router;
@@ -88,7 +88,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
cfg.setRouter(router);
- cfg.setWindows(F.asList((GridStreamerWindow)new GridStreamerUnboundedWindow()));
+ cfg.setWindows(F.asList((StreamerWindow)new GridStreamerUnboundedWindow()));
cfg.setStages(stages);
@@ -103,7 +103,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
final CountDownLatch finishLatch = new CountDownLatch(evtCnt);
- stages = F.<GridStreamerStage>asList(new GridStreamerStage() {
+ stages = F.<StreamerStage>asList(new StreamerStage() {
@IgniteInstanceResource
private Ignite g;
@@ -172,7 +172,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
+ stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
new GridTestStage("c", stage));
startGrids(4);
@@ -289,7 +289,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage),
+ stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage),
new GridTestStage("2", stage), new GridTestStage("3", stage), new GridTestStage("4", stage));
startGrids(4);
@@ -338,7 +338,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
+ stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
startGrids(2);
@@ -402,7 +402,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
+ stages = F.asList((StreamerStage)new GridTestStage("0", stage), new GridTestStage("1", stage));
startGrids(2);
@@ -465,7 +465,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
+ stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
new GridTestStage("c", stage));
router = new GridTestStreamerEventRouter();
atLeastOnce = true;
@@ -554,7 +554,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
+ stages = F.asList((StreamerStage)new GridTestStage("a", stage), new GridTestStage("b", stage),
new GridTestStage("c", stage), new GridTestStage("d", stage));
startGrids(4);
@@ -689,7 +689,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
}
};
- stages = F.asList((GridStreamerStage)new GridTestStage("0", stage),new GridTestStage("1", stage),
+ stages = F.asList((StreamerStage)new GridTestStage("0", stage),new GridTestStage("1", stage),
new GridTestStage("2", stage));
startGrids(1);
@@ -753,7 +753,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
assertEquals(0, metrics.pipelineAverageExecutionTime());
}
- GridStreamerStageMetrics stageMetrics = streamer.metrics().stageMetrics(stage);
+ StreamerStageMetrics stageMetrics = streamer.metrics().stageMetrics(stage);
assertNotNull(stageMetrics);
@@ -773,7 +773,7 @@ public class GridStreamerSelfTest extends GridCommonAbstractTest {
for (String stage : stages) {
IgniteStreamer streamer = ignite.streamer(null);
- GridStreamerStageMetrics metrics = streamer.metrics().stageMetrics(stage);
+ StreamerStageMetrics metrics = streamer.metrics().stageMetrics(stage);
assertNotNull(metrics);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
index 5e6f4f7..397160a 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridTestStage.java
@@ -17,7 +17,7 @@ import java.util.*;
/**
* Test stage.
*/
-class GridTestStage implements GridStreamerStage<Object> {
+class GridTestStage implements StreamerStage<Object> {
/** Stage name. */
private String name;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
index 812ddb7..054ae6d 100644
--- a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
@@ -113,7 +113,7 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest
* @return Streamer configuration.
*/
private static StreamerConfiguration streamerConfiguration() {
- Collection<GridStreamerStage> stages = F.<GridStreamerStage>asList(new GridStreamerStage() {
+ Collection<StreamerStage> stages = F.<StreamerStage>asList(new StreamerStage() {
@Override public String name() {
return "name";
}
@@ -126,7 +126,7 @@ public abstract class GridMarshallerAbstractTest extends GridCommonAbstractTest
StreamerConfiguration cfg = new StreamerConfiguration();
cfg.setAtLeastOnce(true);
- cfg.setWindows(F.asList((GridStreamerWindow)new GridStreamerUnboundedWindow()));
+ cfg.setWindows(F.asList((StreamerWindow)new GridStreamerUnboundedWindow()));
cfg.setStages(stages);
return cfg;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
index c6a542d..e0f76d2 100644
--- a/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
@@ -702,7 +702,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest {
* @param win Window.
* @throws Exception If failed.
*/
- private void checkIterator(GridStreamerWindow<Integer> win) throws Exception {
+ private void checkIterator(StreamerWindow<Integer> win) throws Exception {
win.reset();
assert win.size() == 0;
@@ -751,7 +751,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest {
* @param maxSize Max window size.
* @throws GridException If failed.
*/
- private void finalChecks(GridStreamerWindow<Integer> win, int maxSize) throws GridException {
+ private void finalChecks(StreamerWindow<Integer> win, int maxSize) throws GridException {
int evictQueueSize = win.evictionQueueSize();
info("Eviction queue size for final checks: " + evictQueueSize);
@@ -775,7 +775,7 @@ public class GridStreamerWindowSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
private void checkWindowMultithreaded(
- final GridStreamerWindow<Integer> win,
+ final StreamerWindow<Integer> win,
final int iterCnt,
int threadCnt,
final int range
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34694f39/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java b/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
index e6f75ac..1daa465 100644
--- a/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
+++ b/modules/core/src/test/java/org/gridgain/loadtests/streamer/average/TestStage.java
@@ -19,7 +19,7 @@ import java.util.concurrent.*;
/**
* Stage for average benchmark.
*/
-class TestStage implements GridStreamerStage<Integer> {
+class TestStage implements StreamerStage<Integer> {
/** {@inheritDoc} */
@Override public String name() {
return "stage";
@@ -38,7 +38,7 @@ class TestStage implements GridStreamerStage<Integer> {
for (Integer e : evts)
avg.increment(e, 1);
- GridStreamerWindow<Integer> win = ctx.window();
+ StreamerWindow<Integer> win = ctx.window();
win.enqueueAll(evts);