You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:20 UTC
[19/32] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f54e7bac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f54e7bac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f54e7bac
Branch: refs/heads/master
Commit: f54e7bac000230d2badfc50d543706d0413e37d6
Parents: cb9aa7a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:48:38 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:48:38 2014 +0300
----------------------------------------------------------------------
examples/config/example-streamer.xml | 6 +-
.../streaming/StreamingCheckInExample.java | 22 +-
.../StreamingPopularNumbersExample.java | 34 +-
.../streamer/GridStreamProcessor.java | 6 +-
.../gridgain/grid/streamer/StreamerWindow.java | 6 +-
.../grid/streamer/index/GridStreamerIndex.java | 297 ------
.../streamer/index/GridStreamerIndexEntry.java | 49 -
.../streamer/index/GridStreamerIndexPolicy.java | 42 -
.../index/GridStreamerIndexProvider.java | 99 --
.../index/GridStreamerIndexProviderAdapter.java | 788 ---------------
.../index/GridStreamerIndexProviderMBean.java | 66 --
.../index/GridStreamerIndexUpdateSync.java | 69 --
.../index/GridStreamerIndexUpdater.java | 80 --
.../grid/streamer/index/StreamerIndex.java | 297 ++++++
.../grid/streamer/index/StreamerIndexEntry.java | 49 +
.../streamer/index/StreamerIndexPolicy.java | 42 +
.../streamer/index/StreamerIndexProvider.java | 99 ++
.../index/StreamerIndexProviderAdapter.java | 788 +++++++++++++++
.../index/StreamerIndexProviderMBean.java | 66 ++
.../streamer/index/StreamerIndexUpdateSync.java | 69 ++
.../streamer/index/StreamerIndexUpdater.java | 80 ++
.../hash/GridStreamerHashIndexProvider.java | 493 ----------
.../index/hash/StreamerHashIndexProvider.java | 492 ++++++++++
.../tree/GridStreamerTreeIndexProvider.java | 946 -------------------
.../index/tree/StreamerTreeIndexProvider.java | 945 ++++++++++++++++++
.../streamer/window/StreamerWindowAdapter.java | 36 +-
.../average/spring-streamer-average-base.xml | 2 +-
.../GridStreamerLifecycleAwareSelfTest.java | 6 +-
.../index/GridStreamerIndexSelfTest.java | 108 +--
.../streamer/GridStreamerIndexLoadTest.java | 16 +-
.../loadtests/streamer/IndexUpdater.java | 6 +-
31 files changed, 3052 insertions(+), 3052 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml
index 8eb7cb7..f105770 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -115,7 +115,7 @@
<property name="indexes">
<list>
- <bean class="org.gridgain.grid.streamer.index.tree.GridStreamerTreeIndexProvider">
+ <bean class="org.gridgain.grid.streamer.index.tree.StreamerTreeIndexProvider">
<property name="updater">
<bean class="org.gridgain.examples.streaming.StreamingPopularNumbersExample$IndexUpdater"/>
</property>
@@ -225,7 +225,7 @@
-->
<property name="indexes">
<list>
- <bean class="org.gridgain.grid.streamer.index.hash.GridStreamerHashIndexProvider">
+ <bean class="org.gridgain.grid.streamer.index.hash.StreamerHashIndexProvider">
<property name="unique" value="true"/>
<property name="updater">
@@ -241,7 +241,7 @@
<property name="indexes">
<list>
- <bean class="org.gridgain.grid.streamer.index.hash.GridStreamerHashIndexProvider">
+ <bean class="org.gridgain.grid.streamer.index.hash.StreamerHashIndexProvider">
<property name="unique" value="true"/>
<property name="updater">
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
index f10dec1..34802c7 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
@@ -170,14 +170,14 @@ public class StreamingCheckInExample {
assert win != null;
- GridStreamerIndex<LocationInfo, String, Place> idxView = win.index();
+ StreamerIndex<LocationInfo, String, Place> idxView = win.index();
- Collection<GridStreamerIndexEntry<LocationInfo, String, Place>> entries =
+ Collection<StreamerIndexEntry<LocationInfo, String, Place>> entries =
idxView.entries(0);
Map<String, Place> ret = new HashMap<>(entries.size(), 1.0f);
- for (GridStreamerIndexEntry<LocationInfo, String, Place> e : entries)
+ for (StreamerIndexEntry<LocationInfo, String, Place> e : entries)
ret.put(e.key(), e.value());
return ret;
@@ -512,10 +512,10 @@ public class StreamingCheckInExample {
}
/**
- * Index updater for check-in events. Updaters are specified for {@link GridStreamerIndexProviderAdapter} in
+ * Index updater for check-in events. Updaters are specified for {@link org.gridgain.grid.streamer.index.StreamerIndexProviderAdapter} in
* streamer configuration.
*/
- private static class CheckInEventIndexUpdater implements GridStreamerIndexUpdater<CheckInEvent, String, Location> {
+ private static class CheckInEventIndexUpdater implements StreamerIndexUpdater<CheckInEvent, String, Location> {
/** {@inheritDoc} */
@Nullable @Override public String indexKey(CheckInEvent evt) {
return evt.userName(); // Index key is an event user name.
@@ -529,24 +529,24 @@ public class StreamingCheckInExample {
/** {@inheritDoc} */
@Nullable @Override public Location onAdded(
- GridStreamerIndexEntry<CheckInEvent, String, Location> entry,
+ StreamerIndexEntry<CheckInEvent, String, Location> entry,
CheckInEvent evt) throws GridException {
throw new AssertionError("onAdded() shouldn't be called on unique index.");
}
/** {@inheritDoc} */
@Nullable @Override public Location onRemoved(
- GridStreamerIndexEntry<CheckInEvent, String, Location> entry,
+ StreamerIndexEntry<CheckInEvent, String, Location> entry,
CheckInEvent evt) {
return null;
}
}
/**
- * Index updater for location info. Updaters are specified for {@link GridStreamerIndexProviderAdapter} in
+ * Index updater for location info. Updaters are specified for {@link org.gridgain.grid.streamer.index.StreamerIndexProviderAdapter} in
* streamer configuration.
*/
- private static class PlacesIndexUpdater implements GridStreamerIndexUpdater<LocationInfo, String, Place> {
+ private static class PlacesIndexUpdater implements StreamerIndexUpdater<LocationInfo, String, Place> {
/** {@inheritDoc} */
@Nullable @Override public String indexKey(LocationInfo info) {
return info.userName();
@@ -560,14 +560,14 @@ public class StreamingCheckInExample {
/** {@inheritDoc} */
@Nullable @Override public Place onAdded(
- GridStreamerIndexEntry<LocationInfo, String, Place> entry,
+ StreamerIndexEntry<LocationInfo, String, Place> entry,
LocationInfo evt) throws GridException {
throw new AssertionError("onAdded() shouldn't be called on unique index.");
}
/** {@inheritDoc} */
@Nullable @Override public Place onRemoved(
- GridStreamerIndexEntry<LocationInfo, String, Place> entry,
+ StreamerIndexEntry<LocationInfo, String, Place> entry,
LocationInfo evt) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
index 8794ba9..592afd8 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
@@ -49,22 +49,22 @@ public class StreamingPopularNumbersExample {
private static final int CNT = 10_000_000;
/** Comparator sorting random number entries by number popularity. */
- private static final Comparator<GridStreamerIndexEntry<Integer, Integer, Long>> CMP =
- new Comparator<GridStreamerIndexEntry<Integer, Integer, Long>>() {
- @Override public int compare(GridStreamerIndexEntry<Integer, Integer, Long> e1,
- GridStreamerIndexEntry<Integer, Integer, Long> e2) {
+ private static final Comparator<StreamerIndexEntry<Integer, Integer, Long>> CMP =
+ new Comparator<StreamerIndexEntry<Integer, Integer, Long>>() {
+ @Override public int compare(StreamerIndexEntry<Integer, Integer, Long> e1,
+ StreamerIndexEntry<Integer, Integer, Long> e2) {
return e2.value().compareTo(e1.value());
}
};
/** Reducer selecting first POPULAR_NUMBERS_CNT values. */
- private static class PopularNumbersReducer implements IgniteReducer<Collection<GridStreamerIndexEntry<Integer, Integer, Long>>,
- Collection<GridStreamerIndexEntry<Integer, Integer, Long>>> {
+ private static class PopularNumbersReducer implements IgniteReducer<Collection<StreamerIndexEntry<Integer, Integer, Long>>,
+ Collection<StreamerIndexEntry<Integer, Integer, Long>>> {
/** */
- private final List<GridStreamerIndexEntry<Integer, Integer, Long>> sorted = new ArrayList<>();
+ private final List<StreamerIndexEntry<Integer, Integer, Long>> sorted = new ArrayList<>();
/** {@inheritDoc} */
- @Override public boolean collect(@Nullable Collection<GridStreamerIndexEntry<Integer, Integer, Long>> col) {
+ @Override public boolean collect(@Nullable Collection<StreamerIndexEntry<Integer, Integer, Long>> col) {
if (col != null && !col.isEmpty())
// Add result from remote node to sorted set.
sorted.addAll(col);
@@ -73,7 +73,7 @@ public class StreamingPopularNumbersExample {
}
/** {@inheritDoc} */
- @Override public Collection<GridStreamerIndexEntry<Integer, Integer, Long>> reduce() {
+ @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> reduce() {
Collections.sort(sorted, CMP);
return sorted.subList(0, POPULAR_NUMBERS_CNT < sorted.size() ? POPULAR_NUMBERS_CNT : sorted.size());
@@ -158,13 +158,13 @@ public class StreamingPopularNumbersExample {
try {
// Send reduce query to all 'popular-numbers' streamers
// running on local and remote nodes.
- Collection<GridStreamerIndexEntry<Integer, Integer, Long>> col = streamer.context().reduce(
+ Collection<StreamerIndexEntry<Integer, Integer, Long>> col = streamer.context().reduce(
// This closure will execute on remote nodes.
new IgniteClosure<StreamerContext,
- Collection<GridStreamerIndexEntry<Integer, Integer, Long>>>() {
- @Override public Collection<GridStreamerIndexEntry<Integer, Integer, Long>> apply(
+ Collection<StreamerIndexEntry<Integer, Integer, Long>>>() {
+ @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> apply(
StreamerContext ctx) {
- GridStreamerIndex<Integer, Integer, Long> view = ctx.<Integer>window().index();
+ StreamerIndex<Integer, Integer, Long> view = ctx.<Integer>window().index();
return view.entries(-1 * POPULAR_NUMBERS_CNT);
}
@@ -173,7 +173,7 @@ public class StreamingPopularNumbersExample {
// that submitted the query.
new PopularNumbersReducer());
- for (GridStreamerIndexEntry<Integer, Integer, Long> cntr : col)
+ for (StreamerIndexEntry<Integer, Integer, Long> cntr : col)
System.out.printf("%3d=%d\n", cntr.key(), cntr.value());
System.out.println("----------------");
@@ -219,7 +219,7 @@ public class StreamingPopularNumbersExample {
/**
* This class will be set as part of window index configuration.
*/
- private static class IndexUpdater implements GridStreamerIndexUpdater<Integer, Integer, Long> {
+ private static class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> {
/** {@inheritDoc} */
@Override public Integer indexKey(Integer evt) {
// We use event as index key, so event and key are the same.
@@ -227,12 +227,12 @@ public class StreamingPopularNumbersExample {
}
/** {@inheritDoc} */
- @Nullable @Override public Long onAdded(GridStreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
+ @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
return entry.value() + 1;
}
/** {@inheritDoc} */
- @Nullable @Override public Long onRemoved(GridStreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
+ @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
return entry.value() - 1 == 0 ? null : entry.value() - 1;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
index 194f081..6a2de15 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
@@ -112,14 +112,14 @@ public class GridStreamProcessor extends GridProcessorAdapter {
}
if (win instanceof StreamerWindowAdapter) {
- GridStreamerIndexProvider[] idxs = ((StreamerWindowAdapter)win).indexProviders();
+ StreamerIndexProvider[] idxs = ((StreamerWindowAdapter)win).indexProviders();
if (idxs != null && idxs.length > 0) {
- for (GridStreamerIndexProvider idx : idxs) {
+ for (StreamerIndexProvider idx : idxs) {
try {
mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()),
"Window-" + win.name() + "-index-" + idx.name(), idx,
- GridStreamerIndexProviderMBean.class));
+ StreamerIndexProviderMBean.class));
if (log.isDebugEnabled())
log.debug("Registered MBean for streamer window index [streamer=" + s.name() +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
index 1a16ff5..6096ccc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
@@ -50,7 +50,7 @@ public interface StreamerWindow<E> extends Iterable<E> {
* @param <V> Type of the index value.
* @return Index with default name.
*/
- public <K, V> GridStreamerIndex<E, K, V> index();
+ public <K, V> StreamerIndex<E, K, V> index();
/**
* Gets index by name, if not index with such name was configured then
@@ -61,7 +61,7 @@ public interface StreamerWindow<E> extends Iterable<E> {
* @param <V> Type of the index value.
* @return Index with a given name.
*/
- public <K, V> GridStreamerIndex<E, K, V> index(@Nullable String name);
+ public <K, V> StreamerIndex<E, K, V> index(@Nullable String name);
/**
* Gets all indexes configured for this window.
@@ -69,7 +69,7 @@ public interface StreamerWindow<E> extends Iterable<E> {
* @return All indexes configured for this window or empty collection, if no
* indexes were configured.
*/
- public Collection<GridStreamerIndex<E, ?, ?>> indexes();
+ public Collection<StreamerIndex<E, ?, ?>> indexes();
/**
* Resets window. Usually will clear all events from window.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
deleted file mode 100644
index 618d800..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * User view on streamer index. Streamer indexes are used for fast look ups into streamer windows.
- * <p>
- * Streamer index can be accessed from {@link org.gridgain.grid.streamer.StreamerWindow} via any of the following methods:
- * <ul>
- * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index()}</li>
- * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index(String)}</li>
- * <li>{@link org.gridgain.grid.streamer.StreamerWindow#indexes()}</li>
- * </ul>
- * <p>
- * Indexes are created and provided for streamer windows by {@link GridStreamerIndexProvider} which is
- * specified in streamer configuration.
- * <h1 class="header">Example of how to use indexes</h1>
- * <p>
- * Stock price events are streamed into the system, the stock price event is an object containing stock symbol and price.
- * We need to get minimum price for GOOG instrument.
- * <p>
- * Here is {@link GridStreamerIndexUpdater} that maintains index values up to date:
- * <pre name="code" class="java">
- * class StockPriceIndexUpdater implements GridStreamerIndexUpdater<StockPriceEvent, String, Double> {
- * @Nullable @Override public String indexKey(StockPriceEvent evt) {
- * return evt.getSymbol(); // Symbol is an index key.
- * }
- *
- * @Nullable @Override public Double initialValue(StockPriceEvent evt, String key) {
- * return evt.getPrice(); // Set first event's price as an initial value.
- * }
- *
- * @Nullable @Override public Double onAdded(GridStreamerIndexEntry<StockPriceEvent, String, Double> entry,
- * StockPriceEvent evt) throws GridException {
- * return Math.min(entry.value(), evt.getPrice()); // Update the minimum on new event.
- * }
- *
- * @Nullable @Override
- * public Double onRemoved(GridStreamerIndexEntry<StockPriceEvent, String, Double> entry, StockPriceEvent evt) {
- * return entry.value(); // Leave minimum unchanged when event is evicted.
- * }
- * }
- * </pre>
- * <p>
- * Here is the code that queries minimum price for GOOG instrument using index:
- * <pre name="code" class="java">
- * double minGooglePrice = streamer.context().reduce(
- * // This closure will execute on remote nodes.
- * new GridClosure<GridStreamerContext, Double>() {
- * @Nullable @Override public Double apply(GridStreamerContext ctx) {
- * GridStreamerIndex<StockPriceEvent, String, Double> minIdx = ctx.<StockPriceEvent>window().index("min-price");
- *
- * return minIdx.entry("GOOG").value();
- * }
- * },
- * new GridReducer<Double, Double>() {
- * private double minPrice = Integer.MAX_VALUE;
- *
- * @Override public boolean collect(Double price) {
- * minPrice = Math.min(minPrice, price); // Take minimum price from all nodes.
- *
- * return true;
- * }
- *
- * @Override public Double reduce() {
- * return minPrice;
- * }
- * }
- * );
- * </pre>
- */
-public interface GridStreamerIndex<E, K, V> extends Iterable<GridStreamerIndexEntry<E, K, V>> {
- /**
- * Index name.
- *
- * @return Index name.
- */
- @Nullable public String name();
-
- /**
- * Gets index unique flag. If index is unique then exception
- * will be thrown if key is already present in the index.
- *
- * @return Index unique flag.
- */
- public boolean unique();
-
- /**
- * Returns {@code true} if index supports sorting and therefore can perform range operations.
- * <p>
- * Note that sorting happens by value and not by key.
- *
- * @return Index sorted flag.
- */
- public boolean sorted();
-
- /**
- * Gets index policy.
- *
- * @return Index policy.
- */
- public GridStreamerIndexPolicy policy();
-
- /**
- * @return Number entries in the index.
- */
- public int size();
-
- /**
- * Gets index entry for given key.
- *
- * @param key Key for which to retrieve entry.
- * @return Entry for given key, or {@code null} if one could not be found.
- */
- @Nullable public GridStreamerIndexEntry<E, K, V> entry(K key);
-
- /**
- * Gets read-only collection of entries in the index.
- * <p>
- * Returned collection is ordered for sorted indexes.
- *
- * @param cnt If 0 then all entries are returned,
- * if positive, then returned collection contains up to {@code cnt} elements
- * (in ascending order for sorted indexes),
- * if negative, then returned collection contains up to {@code |cnt|} elements
- * (in descending order for sorted indexes and not supported for unsorted indexes).
- * @return Collection of entries in the index.
- */
- public Collection<GridStreamerIndexEntry<E, K, V>> entries(int cnt);
-
- /**
- * Gets read-only set of index keys.
- * <p>
- * Returned collection is ordered for sorted indexes.
- *
- * @param cnt If 0 then all keys are returned,
- * if positive, then returned collection contains up to {@code cnt} elements
- * (in ascending order for sorted indexes),
- * if negative, then returned collection contains up to {@code |cnt|} elements
- * (in descending order for sorted indexes and not supported for unsorted indexes).
- * @return Read-only set of index keys within given position range.
- */
- public Set<K> keySet(int cnt);
-
- /**
- * Gets read-only collection of index values.
- * <p>
- * Returned collection is ordered for sorted indexes.
- *
- * @param cnt If 0 then all values are returned,
- * if positive, then returned collection contains up to {@code cnt} elements
- * (in ascending order for sorted indexes),
- * if negative, then returned collection contains up to {@code |cnt|} elements
- * (in descending order for sorted indexes and not supported for unsorted indexes).
- * @return Read-only collections of index values.
- */
- public Collection<V> values(int cnt);
-
- /**
- * Gets read-only collection of index events.
- * <p>
- * For sorted indexes events are guaranteed to be grouped by corresponding values, however
- * the order of the events corresponding to the same value is not defined.
- *
- * @param cnt If 0 then all values are returned,
- * if positive, then returned collection contains up to {@code cnt} elements
- * (in ascending order of values for sorted indexes),
- * if negative, then returned collection contains up to {@code |cnt|} elements
- * (in descending order of values for sorted indexes and not supported for unsorted indexes).
- * @return Read-only collections of index events.
- * @throws IllegalStateException If index is not configured to track events.
- * @see GridStreamerIndexPolicy
- */
- public Collection<E> events(int cnt);
-
- /**
- * Gets read-only set of index entries with given value.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @param val Value.
- * @return Read-only set of index entries with given value.
- */
- public Set<GridStreamerIndexEntry<E, K, V>> entrySet(V val);
-
- /**
- * Gets read-only set of index entries within given value range.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @param asc {@code True} for ascending set.
- * @param fromVal From value, if {@code null}, then start from beginning.
- * @param toVal To value, if {@code null} then include all entries until the end.
- * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
- * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
- * @return Read-only set of index entries within given value range.
- */
- public Set<GridStreamerIndexEntry<E, K, V>> entrySet(boolean asc, @Nullable V fromVal, boolean fromIncl,
- @Nullable V toVal, boolean toIncl);
-
- /**
- * Gets read-only set of index keys with given value. Iteration order over
- * this set has the same order as within index.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @param val Value.
- * @return Read-only set of index entries with given value.
- */
- public Set<K> keySet(V val);
-
- /**
- * Gets read-only set of index keys within given value range. Iteration order over
- * this set has the same order as within index.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @param asc {@code True} for ascending set.
- * @param fromVal From value, if {@code null}, then start from beginning.
- * @param toVal To value, if {@code null} then include all entries until the end.
- * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
- * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
- * @return Read-only set of index entries within given value range.
- */
- public Set<K> keySet(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
-
- /**
- * Gets read-only collection of index values within given value range. Iteration order over
- * this collection has the same order as within index.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @param asc {@code True} for ascending set.
- * @param fromVal From value, if {@code null}, then start from beginning.
- * @param toVal To value, if {@code null} then include all entries until the end.
- * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
- * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
- * @return Read-only set of index entries within given value range.
- */
- public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
-
- /**
- * Gets read-only collection of index events.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @param val From value, if {@code null}, then start from beginning.
- * @return Read-only set of index entries with given value.
- */
- public Collection<E> events(@Nullable V val);
-
- /**
- * Gets read-only collection of index events.
- * <p>
- * Events are guaranteed to be sorted by corresponding values, however
- * the order of the events corresponding to the same value is not defined.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @param asc {@code True} for ascending set.
- * @param fromVal From value, if {@code null}, then start from beginning.
- * @param toVal To value, if {@code null} then include all entries until the end.
- * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
- * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
- * @return Read-only set of index entries within given value range.
- */
- public Collection<E> events(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
-
- /**
- * Gets first entry in the index.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @return First entry in the index, or {@code null} if index is empty.
- */
- @Nullable public GridStreamerIndexEntry<E, K, V> firstEntry();
-
- /**
- * Gets last entry in the index.
- * <p>
- * This operation is only available for sorted indexes.
- *
- * @return Last entry in the index, or {@code null} if index is empty.
- */
- @Nullable public GridStreamerIndexEntry<E, K, V> lastEntry();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java
deleted file mode 100644
index dc90004..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer index entry. Individual index entry contains index key, value, and all events
- * associated with given key.
- *
- */
-public interface GridStreamerIndexEntry<E, K, V> {
- /**
- * Gets events associated with given index key and value.
- * <p>
- * Events are tracked only if {@link GridStreamerIndexProvider#getPolicy()}
- * is set to {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON} or
- * {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}.
- *
- * @return Events associated with given index key and value or {@code null} if event tracking is off.
- */
- @Nullable public Collection<E> events();
-
- /**
- * Gets index entry key.
- *
- * @return Index entry key.
- */
- public K key();
-
- /**
- * Gets index entry value.
- * <p>
- * For sorted indexes, the sorting happens based on this value.
- *
- * @return Index entry value.
- */
- public V value();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java
deleted file mode 100644
index 5fc7ae7..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-/**
- * Streamer index policy, which defines how events
- * are tracked within an index.
- */
-public enum GridStreamerIndexPolicy {
- /**
- * Do not track events.
- * <p>
- * Only a value, generated by {@link GridStreamerIndexUpdater},
- * will be stored in an index; event objects will be thrown away.
- */
- EVENT_TRACKING_OFF,
-
- /**
- * Track events.
- * <p>
- * All event objects will stored in an index along with the values,
- * generated by {@link GridStreamerIndexUpdater}.
- */
- EVENT_TRACKING_ON,
-
- /**
- * Track events with de-duplication.
- * <p>
- * All event objects will stored in an index along with the values,
- * generated by {@link GridStreamerIndexUpdater}. For duplicate (equal)
- * events, only a single event object will be stored, which corresponds
- * to a first event.
- */
- EVENT_TRACKING_ON_DEDUP
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
deleted file mode 100644
index ebcd610..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.gridgain.grid.*;
-
-/**
- * Represents an actual instance of an index. Used by a {@link org.gridgain.grid.streamer.StreamerWindow}
- * to perform event indexing.
- * <p>
- * To configure index for a streamer window, use
- * {@link org.gridgain.grid.streamer.window.StreamerWindowAdapter#setIndexes(GridStreamerIndexProvider[])}.
- */
-public interface GridStreamerIndexProvider<E, K, V> extends GridStreamerIndexProviderMBean {
- /**
- * Gets index name.
- *
- * @return Name of the index.
- */
- public String getName();
-
- /**
- * Gets user view for this index. This view is a snapshot
- * of a current index state. Once returned, it does not
- * change over time.
- *
- * @return User view for this index.
- */
- public GridStreamerIndex<E, K, V> index();
-
- /**
- * Initializes the index.
- */
- public void initialize();
-
- /**
- * Resets the index to an initial empty state.
- */
- public void reset();
-
- /**
- * Disposes the index.
- */
- public void dispose();
-
- /**
- * Adds an event to index.
- *
- * @param sync Index update synchronizer.
- * @param evt Event to add to an index.
- * @throws GridException If failed to add event to an index.
- */
- public void add(GridStreamerIndexUpdateSync sync, E evt) throws GridException;
-
- /**
- * Removes an event from index.
- *
- * @param sync Index update synchronizer.
- * @param evt Event to remove from index.
- * @throws GridException If failed to add event to an index.
- */
- public void remove(GridStreamerIndexUpdateSync sync, E evt) throws GridException;
-
- /**
- * Gets event indexing policy, which defines how events
- * are tracked within an index.
- *
- * @return index policy.
- */
- public GridStreamerIndexPolicy getPolicy();
-
- /**
- * Checks whether this index is unique or not. If it is, equal events
- * are not allowed, which means that if a newly-added event is found
- * to be equal to one of the already present events
- * ({@link Object#equals(Object)} returns {@code true}), an exception
- * is thrown.
- *
- * @return {@code True} for unique index.
- */
- public boolean isUnique();
-
- /**
- * Finalizes an update operation.
- *
- * @param sync Index update synchronizer.
- * @param evt Updated event.
- * @param rollback Rollback flag. If {@code true}, a rollback was made.
- * @param rmv Remove flag. If {@code true}, the event was removed from index.
- */
- public void endUpdate(GridStreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java
deleted file mode 100644
index f9b7bcd..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java
+++ /dev/null
@@ -1,788 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import com.romix.scala.*;
-import com.romix.scala.collection.concurrent.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-import org.pcollections.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.gridgain.grid.streamer.index.GridStreamerIndexPolicy.*;
-
-/**
- * Convenient {@link GridStreamerIndexProvider} adapter implementing base configuration methods.
- */
-public abstract class GridStreamerIndexProviderAdapter<E, K, V> implements GridStreamerIndexProvider<E, K, V> {
- /** */
- protected final IgniteClosure<GridStreamerIndexEntry<E, K, V>, V> entryToVal =
- new C1<GridStreamerIndexEntry<E, K, V>, V>() {
- @Override public V apply(GridStreamerIndexEntry<E, K, V> e) {
- return e.value();
- }
- };
-
- /** */
- protected final IgniteClosure<GridStreamerIndexEntry<E, K, V>, K> entryToKey =
- new C1<GridStreamerIndexEntry<E, K, V>, K>() {
- @Override public K apply(GridStreamerIndexEntry<E, K, V> e) {
- return e.key();
- }
- };
-
- /** Keys currently being updated. */
- private final ConcurrentMap<K, GridStreamerIndexUpdateSync> locks = new ConcurrentHashMap8<>();
-
- /** Index name. */
- private String name;
-
- /** Index policy. */
- private GridStreamerIndexPolicy plc = EVENT_TRACKING_OFF;
-
- /** Index updater. */
- private GridStreamerIndexUpdater<E, K, V> updater;
-
- /** */
- private final LongAdder evtsCnt = new LongAdder();
-
- /** Read write lock. */
- private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
-
- /** */
- private boolean unique;
-
- /** */
- private final ThreadLocal<K> threadLocKey = new ThreadLocal<>();
-
- /** */
- private final ConcurrentMap<IndexKey<V>, GridStreamerIndexUpdateSync> idxLocks = new ConcurrentHashMap8<>();
-
- /** */
- private boolean keyCheck = true;
-
- /**
- * Sets index name.
- *
- * @param name Index name.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return name;
- }
-
- /**
- * Sets index policy.
- *
- * @param plc Policy.
- */
- public void setPolicy(GridStreamerIndexPolicy plc) {
- this.plc = plc;
- }
-
- /** {@inheritDoc} */
- @Override public GridStreamerIndexPolicy getPolicy() {
- return plc;
- }
-
- /**
- * Sets unique flag.
- *
- * @param unique {@code True} for unique index.
- */
- public void setUnique(boolean unique) {
- this.unique = unique;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isUnique() {
- return unique;
- }
-
- /**
- * Sets index updater.
- *
- * @param updater Updater.
- */
- public void setUpdater(GridStreamerIndexUpdater<E, K, V> updater) {
- this.updater = updater;
- }
-
- /**
- * Gets index updater.
- *
- * @return Updater.
- */
- public GridStreamerIndexUpdater<E, K, V> getUpdater() {
- return updater;
- }
-
- /** {@inheritDoc} */
- @Override public void dispose() {
- // No-op.
- }
-
- /**
- * Add event to the index.
- *
- * @param sync Sync.
- * @param evt Event.
- */
- @Override public void add(GridStreamerIndexUpdateSync sync, E evt) throws GridException {
- assert evt != null;
-
- if (threadLocKey.get() != null)
- throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get());
-
- K key = updater.indexKey(evt);
-
- if (key == null)
- return; // Ignore event.
-
- validateIndexKey(key);
-
- readLock();
-
- threadLocKey.set(key);
-
- lockKey(key, sync);
-
- add(evt, key, sync);
- }
-
- /**
- * Remove event from the index.
- *
- * @param sync Sync.
- * @param evt Event.
- */
- @Override public void remove(GridStreamerIndexUpdateSync sync, E evt) throws GridException {
- assert evt != null;
-
- if (threadLocKey.get() != null)
- throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get());
-
- K key = updater.indexKey(evt);
-
- assert key != null;
-
- validateIndexKey(key);
-
- readLock();
-
- threadLocKey.set(key);
-
- lockKey(key, sync);
-
- remove(evt, key, sync);
- }
-
- /** {@inheritDoc} */
- @Override public void endUpdate(GridStreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv) {
- K key = threadLocKey.get();
-
- if (key == null)
- return;
-
- if (!rollback) {
- if (rmv)
- evtsCnt.decrement();
- else
- evtsCnt.increment();
- }
-
- threadLocKey.remove();
-
- endUpdate0(sync, evt, key, rollback);
-
- unlockKey(key, sync);
-
- readUnlock();
- }
-
- /**
- * @param sync Sync.
- * @param evt Event.
- * @param key Key.
- * @param rollback Rollback flag.
- */
- protected abstract void endUpdate0(GridStreamerIndexUpdateSync sync, E evt, K key, boolean rollback);
-
- /** {@inheritDoc} */
- @Override public void reset() {
- writeLock();
-
- try {
- reset0();
- }
- finally {
- writeUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public GridStreamerIndex<E, K, V> index() {
- writeLock();
-
- try {
- return index0();
- }
- finally {
- writeUnlock();
- }
- }
-
- /**
- * Called on reset.
- */
- protected abstract void reset0();
-
- /**
- * @return Index
- */
- protected abstract GridStreamerIndex<E, K, V> index0();
-
- /**
- *
- */
- protected void readLock() {
- rwLock.readLock();
- }
-
- /**
- *
- */
- protected void readUnlock() {
- rwLock.readUnlock();
- }
-
- /**
- *
- */
- protected void writeLock() {
- rwLock.writeLock();
- }
-
- /**
- *
- */
- protected void writeUnlock() {
- rwLock.writeUnlock();
- }
-
- /**
- * @return Events count,
- */
- protected int eventsCount() {
- return evtsCnt.intValue();
- }
-
- /**
- * Add event to the index.
- *
- * @param evt Event.
- * @param key key.
- * @param sync Sync.
- * @throws GridException If failed.
- */
- protected abstract void add(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException;
-
- /**
- * Remove event from the index.
- *
- * @param evt Event.
- * @param key Key.
- * @param sync Sync.
- * @throws GridException If failed.
- */
- protected abstract void remove(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException;
-
- /**
- * Lock updates on particular key.
- *
- * @param key Key.
- * @param sync Sync.
- * @throws GridException If failed.
- */
- private void lockKey(K key, GridStreamerIndexUpdateSync sync) throws GridException {
- assert key != null;
- assert sync != null;
-
- while (true) {
- GridStreamerIndexUpdateSync old = locks.putIfAbsent(key, sync);
-
- if (old != null) {
- try {
- old.await();
- }
- catch (InterruptedException e) {
- throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e);
- }
-
- // No point to replace or remove sync here.
- // Owner will first remove it, then will finish the sync.
- }
- else
- break;
- }
- }
-
- /**
- * Unlock updates on particular key.
- *
- * @param key Key.
- * @param sync Sync.
- */
- private void unlockKey(K key, GridStreamerIndexUpdateSync sync) {
- assert key != null;
-
- locks.remove(key, sync);
- }
-
- /**
- * Lock updates on particular key.
- *
- * @param key Key.
- * @param sync Sync.
- * @throws GridException If failed.
- */
- protected void lockIndexKey(IndexKey<V> key, GridStreamerIndexUpdateSync sync) throws GridException {
- assert key != null;
- assert sync != null;
- assert isUnique();
-
- while (true) {
- GridStreamerIndexUpdateSync old = idxLocks.putIfAbsent(key, sync);
-
- if (old != null) {
- try {
- old.await();
- }
- catch (InterruptedException e) {
- throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e);
- }
-
- // No point to replace or remove sync here.
- // Owner will first remove it, then will finish the sync.
- }
- else
- break;
- }
- }
-
- /**
- * Unlock updates on particular key.
- *
- * @param key Key.
- * @param sync Sync.
- */
- protected void unlockIndexKey(IndexKey<V> key, GridStreamerIndexUpdateSync sync) {
- assert key != null;
- assert isUnique();
-
- idxLocks.remove(key, sync);
- }
-
- /**
- * @param key Key,
- * @param val Value.
- * @param idxKey Index key.
- * @param evt Event.
- * @return Entry.
- */
- protected Entry<E, K, V> newEntry(K key, V val, @Nullable IndexKey<V> idxKey, E evt) {
- GridStreamerIndexPolicy plc = getPolicy();
-
- switch (plc) {
- case EVENT_TRACKING_OFF:
- return new NonTrackingEntry<>(key, val, idxKey);
-
- case EVENT_TRACKING_ON:
- return new EventTrackingEntry<>(addToCollection(null, evt), key, val, idxKey);
-
- default:
- assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
-
- return new DedupTrackingEntry<>(addToMap(null, evt), key, val, idxKey);
- }
- }
-
- /**
- * @param oldEntry Old entry.
- * @param key Key,
- * @param val Value.
- * @param idxKey Index key.
- * @param evt Event.
- * @return Entry.
- */
- protected Entry<E, K, V> addEvent(GridStreamerIndexEntry<E,K,V> oldEntry, K key, V val,
- @Nullable IndexKey<V> idxKey, E evt) {
- GridStreamerIndexPolicy plc = getPolicy();
-
- switch (plc) {
- case EVENT_TRACKING_OFF:
- return new NonTrackingEntry<>(key, val, idxKey);
-
- case EVENT_TRACKING_ON:
- return new EventTrackingEntry<>(addToCollection(oldEntry.events(), evt), key, val, idxKey);
-
- default:
- assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
-
- return new DedupTrackingEntry<>(addToMap(((DedupTrackingEntry<E, K, V>)oldEntry).rawEvents(), evt),
- key, val, idxKey);
- }
- }
-
- /**
- * @param oldEntry Old entry.
- * @param key Key,
- * @param val Value.
- * @param idxKey Index key.
- * @param evt Event.
- * @return Entry.
- */
- protected Entry<E, K, V> removeEvent(GridStreamerIndexEntry<E, K, V> oldEntry, K key, V val,
- @Nullable IndexKey<V> idxKey, E evt) {
- GridStreamerIndexPolicy plc = getPolicy();
-
- switch (plc) {
- case EVENT_TRACKING_OFF:
- return new NonTrackingEntry<>(key, val, idxKey);
-
- case EVENT_TRACKING_ON:
- Collection<E> oldEvts = oldEntry.events();
-
- assert oldEvts != null; // Event tracking is on.
-
- Collection<E> newEvts = removeFromCollection(oldEvts, evt);
-
- return new EventTrackingEntry<>(newEvts != null ? newEvts : oldEvts, key, val, idxKey);
-
- default:
- assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
-
- Map<E, Integer> oldMap = ((DedupTrackingEntry<E, K, V>)oldEntry).rawEvents();
-
- assert oldMap != null; // Event tracking is on.
-
- Map<E, Integer> newMap = removeFromMap(oldMap, evt);
-
- return new DedupTrackingEntry<>(newMap != null ? newMap : oldMap, key, val, idxKey);
- }
- }
-
- /**
- * @param col Collection.
- * @param evt Event.
- * @return Cloned collection.
- */
- protected static <E> Collection<E> addToCollection(@Nullable Collection<E> col, E evt) {
- PVector<E> res = col == null ? TreePVector.<E>empty() : (PVector<E>)col;
-
- return res.plus(evt);
- }
-
- /**
- * @param map Collection.
- * @param evt Event.
- * @return Cloned map.
- */
- protected static <E> Map<E, Integer> addToMap(@Nullable Map<E, Integer> map, E evt) {
- HashPMap<E, Integer> res = map == null ? HashTreePMap.<E, Integer>empty() : (HashPMap<E, Integer>)map;
-
- Integer cnt = res.get(evt);
-
- return cnt != null ? res.minus(evt).plus(evt, cnt + 1) : res.plus(evt, 1);
- }
-
- /**
- * @param col Collection.
- * @param evt Event.
- * @return Cloned collection.
- */
- @Nullable protected static <E> Collection<E> removeFromCollection(@Nullable Collection<E> col, E evt) {
- if (col == null)
- return null;
-
- PVector<E> res = (PVector<E>)col;
-
- res = res.minus(evt);
-
- return res.isEmpty() ? null : res;
- }
-
- /**
- * @param map Collection.
- * @param evt Event.
- * @return Cloned map.
- */
- @Nullable protected static <E> Map<E, Integer> removeFromMap(@Nullable Map<E, Integer> map, E evt) {
- if (map == null)
- return null;
-
- HashPMap<E, Integer> res = (HashPMap<E, Integer>)map;
-
- Integer cnt = res.get(evt);
-
- return cnt == null ? res : cnt == 1 ? res.minus(evt) : res.minus(evt).plus(evt, cnt - 1);
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public String updaterClass() {
- return updater.getClass().getName();
- }
-
- /** {@inheritDoc} */
- @Override public boolean unique() {
- return unique;
- }
-
- /** {@inheritDoc} */
- @Override public GridStreamerIndexPolicy policy() {
- return plc;
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return index0().size();
- }
-
- /**
- * Validates that given index key has overridden equals and hashCode methods.
- *
- * @param key Index key.
- * @throws IllegalArgumentException If validation fails.
- */
- private void validateIndexKey(@Nullable Object key) {
- if (keyCheck) {
- keyCheck = false;
-
- if (key == null)
- return;
-
- if (!U.overridesEqualsAndHashCode(key))
- throw new IllegalArgumentException("Index key must override hashCode() and equals() methods: " +
- key.getClass().getName());
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridStreamerIndexProviderAdapter.class, this);
- }
-
- /**
- * Streamer window index key.
- */
- protected interface IndexKey<V> {
- /**
- * @return Value associated with this key.
- */
- public V value();
- }
-
- /**
- * Utility method to safely get values from TrieMap.
- * See: https://github.com/romix/java-concurrent-hash-trie-map/issues/4
- *
- * @param key Key.
- * @param map Trie map.
- * @return Value from map.
- */
- @SuppressWarnings({"IfMayBeConditional", "TypeMayBeWeakened"})
- protected static <K, V> V trieGet(K key, TrieMap<K, V> map) {
- Object r = map.get(key);
-
- if(r instanceof Some)
- return ((Some<V>)r).get ();
- else if(r instanceof None)
- return null;
- else
- return (V)r;
- }
-
- /**
- * Streamer window index entry.
- */
- protected abstract static class Entry<E, K, V> implements GridStreamerIndexEntry<E, K, V> {
- /** */
- private final K key;
-
- /** */
- private final V val;
-
- /** */
- private final IndexKey<V> idxKey;
-
- /**
- * @param key Key.
- * @param val Value.
- * @param idxKey Key index.
- */
- Entry(K key, V val, @Nullable IndexKey<V> idxKey) {
- assert key != null;
- assert val != null;
- assert idxKey == null || idxKey.value() == val : "Keys are invalid [idxKey=" + idxKey + ", val=" + val +']';
-
- this.key = key;
- this.val = val;
- this.idxKey = idxKey;
- }
-
- /** {@inheritDoc} */
- @Override public K key() {
- return key;
- }
-
- /** {@inheritDoc} */
- @Override public V value() {
- return val;
- }
-
- /**
- * @return Internal key.
- */
- @Nullable public IndexKey<V> keyIndex() {
- return idxKey;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- if (!(obj instanceof Entry))
- return false;
-
- GridStreamerIndexEntry<E, K, V> e = (GridStreamerIndexEntry<E, K, V>)obj;
-
- return key.equals(e.key());
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return key.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(Entry.class, this, "identity", System.identityHashCode(this));
- }
- }
-
- /**
- * Entry with index policy {@link GridStreamerIndexPolicy#EVENT_TRACKING_OFF}.
- */
- protected static class NonTrackingEntry<E, K, V> extends Entry<E, K, V> {
- /**
- * @param key Key.
- * @param val Value.
- * @param idxKey Key index.
- */
- public NonTrackingEntry(K key, V val, @Nullable IndexKey<V> idxKey) {
- super(key, val, idxKey);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<E> events() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(NonTrackingEntry.class, this, super.toString());
- }
- }
-
- /**
- * Entry with index policy {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON}.
- */
- protected static class EventTrackingEntry<E, K, V> extends Entry<E, K, V> {
- /** */
- private final Collection<E> evts;
-
- /**
- * @param evts Events.
- * @param key Key.
- * @param val Value.
- * @param idxKey Key index.
- */
- public EventTrackingEntry(Collection<E> evts, K key, V val, @Nullable IndexKey<V> idxKey) {
- super(key, val, idxKey);
-
- assert evts == null || !evts.isEmpty() : "Invalid events: " + evts;
-
- this.evts = evts;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<E> events() {
- return evts;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(EventTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString());
- }
- }
-
- /**
- * Entry with index policy {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}.
- */
- protected static class DedupTrackingEntry<E, K, V> extends Entry<E, K, V> {
- /** */
- private final Map<E, Integer> evts;
-
- /**
- * @param evts Events.
- * @param key Key.
- * @param val Value.
- * @param idxKey Key index.
- */
- public DedupTrackingEntry(Map<E, Integer> evts, K key, V val, @Nullable IndexKey<V> idxKey) {
- super(key, val, idxKey);
-
- assert evts == null || !evts.isEmpty() : "Invalid events: " + evts;
-
- this.evts = evts;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<E> events() {
- return Collections.unmodifiableSet(evts.keySet());
- }
-
- /**
- * @return Events.
- */
- @Nullable public Map<E, Integer> rawEvents() {
- return evts;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(DedupTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java
deleted file mode 100644
index 5952673..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.apache.ignite.mbean.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Streamer window index provider MBean.
- */
-public interface GridStreamerIndexProviderMBean {
- /**
- * Index name.
- *
- * @return Index name.
- */
- @IgniteMBeanDescription("Index name.")
- @Nullable public String name();
-
- /**
- * Gets index updater class name.
- *
- * @return Index updater class.
- */
- @IgniteMBeanDescription("Index updater class name.")
- public String updaterClass();
-
- /**
- * Gets index unique flag.
- *
- * @return Index unique flag.
- */
- @IgniteMBeanDescription("Index unique flag.")
- public boolean unique();
-
- /**
- * Returns {@code true} if index supports sorting and therefore can perform range operations.
- *
- * @return Index sorted flag.
- */
- @IgniteMBeanDescription("Index sorted flag.")
- public boolean sorted();
-
- /**
- * Gets index policy.
- *
- * @return Index policy.
- */
- @IgniteMBeanDescription("Index policy.")
- public GridStreamerIndexPolicy policy();
-
- /**
- * Gets current index size.
- *
- * @return Current index size.
- */
- @IgniteMBeanDescription("Current index size.")
- public int size();
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java
deleted file mode 100644
index 1f708d0..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Streamer index update synchronizer.
- * <p>
- * Used in {@link GridStreamerIndexProvider} to synchronize
- * operations on window index.
- *
- * @see GridStreamerIndexProvider
- *
- */
-public class GridStreamerIndexUpdateSync {
- /** */
- private volatile int res;
-
- /**
- * Waits for a notification from another thread, which
- * should call {@link #finish(int)} with an operation result.
- * That result is returned by this method.
- *
- * @return Operation results, passed to {@link #finish(int)}.
- * @throws InterruptedException If wait was interrupted.
- */
- public int await() throws InterruptedException {
- int res0 = res;
-
- if (res0 == 0) {
- synchronized (this) {
- while ((res0 = res) == 0)
- wait();
- }
- }
-
- assert res0 != 0;
-
- return res0;
- }
-
- /**
- * Notifies all waiting threads to finish waiting.
- *
- * @param res Operation result to return from {@link #await()}.
- */
- public void finish(int res) {
- assert res != 0;
-
- synchronized (this) {
- this.res = res;
-
- notifyAll();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridStreamerIndexUpdateSync.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java
deleted file mode 100644
index 5f76bf6..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.gridgain.grid.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Index updater. The main responsibility of index updater is to maintain index values
- * up to date whenever events are added or removed from window.
- * <p>
- * Updater is provided to index provider in configuration usually via
- * {@link GridStreamerIndexProviderAdapter#setUpdater(GridStreamerIndexUpdater)} method.
- */
-public interface GridStreamerIndexUpdater<E, K, V> {
- /**
- * Given an event, extract index key. For example, if you have a 'Person' object
- * with field 'age' and need to index based on this field, then this method
- * should return the value of age field.
- * <p>
- * If {@code null} is returned then event will be ignored by the index.
- *
- * @param evt Event being added or removed from the window.
- * @return Index key for this event.
- */
- @Nullable public K indexKey(E evt);
-
- /**
- * Gets initial value for the index or {@code null} if event should be ignored.
- * This method is called every time when an entry is added to the window in
- * order to get initial value for given key.
- *
- * @param evt Event being added to or removed from window.
- * @param key Index key return by {@link #indexKey(Object)} method.
- * @return Initial value for given key, if {@code null} then event will be
- * ignored and index entry will not be created.
- */
- @Nullable public V initialValue(E evt, K key);
-
- /**
- * Callback invoked whenever an event is being added to the window. Given a key and
- * a current index value for this key, the implementation should return the new
- * value for this key. If returned value is {@code null}, then current entry will
- * be removed from the index.
- * <p>
- * If index is sorted, then sorting happens based on the returned value.
- *
- * @param entry Current index entry.
- * @param evt New event.
- * @return New index value for given key, if {@code null}, then current
- * index entry will be removed the index.
- * @throws GridException If entry should not be added to index (e.g. if uniqueness is violated).
- */
- @Nullable public V onAdded(GridStreamerIndexEntry<E, K, V> entry, E evt) throws GridException;
-
- /**
- * Callback invoked whenever an event is being removed from the window and has
- * index entry for given key. If there was no entry for given key, then
- * {@code onRemoved()} will not be called.
- * <p>
- * Given a key and a current index value for this key, the implementation should return the new
- * value for this key. If returned value is {@code null}, then current entry will
- * be removed from the index.
- * <p>
- * If index is sorted, then sorting happens based on the returned value.
- *
- * @param entry Current index entry.
- * @param evt Event being removed from the window.
- * @return New index value for given key, if {@code null}, then current
- * index entry will be removed the index.
- */
- @Nullable public V onRemoved(GridStreamerIndexEntry<E, K, V> entry, E evt);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java
new file mode 100644
index 0000000..5f069cb
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java
@@ -0,0 +1,297 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * User view on streamer index. Streamer indexes are used for fast look ups into streamer windows.
+ * <p>
+ * Streamer index can be accessed from {@link org.gridgain.grid.streamer.StreamerWindow} via any of the following methods:
+ * <ul>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index()}</li>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index(String)}</li>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#indexes()}</li>
+ * </ul>
+ * <p>
+ * Indexes are created and provided for streamer windows by {@link StreamerIndexProvider} which is
+ * specified in streamer configuration.
+ * <h1 class="header">Example of how to use indexes</h1>
+ * <p>
+ * Stock price events are streamed into the system, the stock price event is an object containing stock symbol and price.
+ * We need to get minimum price for GOOG instrument.
+ * <p>
+ * Here is {@link StreamerIndexUpdater} that maintains index values up to date:
+ * <pre name="code" class="java">
+ * class StockPriceIndexUpdater implements GridStreamerIndexUpdater<StockPriceEvent, String, Double> {
+ * @Nullable @Override public String indexKey(StockPriceEvent evt) {
+ * return evt.getSymbol(); // Symbol is an index key.
+ * }
+ *
+ * @Nullable @Override public Double initialValue(StockPriceEvent evt, String key) {
+ * return evt.getPrice(); // Set first event's price as an initial value.
+ * }
+ *
+ * @Nullable @Override public Double onAdded(GridStreamerIndexEntry<StockPriceEvent, String, Double> entry,
+ * StockPriceEvent evt) throws GridException {
+ * return Math.min(entry.value(), evt.getPrice()); // Update the minimum on new event.
+ * }
+ *
+ * @Nullable @Override
+ * public Double onRemoved(GridStreamerIndexEntry<StockPriceEvent, String, Double> entry, StockPriceEvent evt) {
+ * return entry.value(); // Leave minimum unchanged when event is evicted.
+ * }
+ * }
+ * </pre>
+ * <p>
+ * Here is the code that queries minimum price for GOOG instrument using index:
+ * <pre name="code" class="java">
+ * double minGooglePrice = streamer.context().reduce(
+ * // This closure will execute on remote nodes.
+ * new GridClosure<GridStreamerContext, Double>() {
+ * @Nullable @Override public Double apply(GridStreamerContext ctx) {
+ * GridStreamerIndex<StockPriceEvent, String, Double> minIdx = ctx.<StockPriceEvent>window().index("min-price");
+ *
+ * return minIdx.entry("GOOG").value();
+ * }
+ * },
+ * new GridReducer<Double, Double>() {
+ * private double minPrice = Integer.MAX_VALUE;
+ *
+ * @Override public boolean collect(Double price) {
+ * minPrice = Math.min(minPrice, price); // Take minimum price from all nodes.
+ *
+ * return true;
+ * }
+ *
+ * @Override public Double reduce() {
+ * return minPrice;
+ * }
+ * }
+ * );
+ * </pre>
+ */
+public interface StreamerIndex<E, K, V> extends Iterable<StreamerIndexEntry<E, K, V>> {
+ /**
+ * Index name.
+ *
+ * @return Index name.
+ */
+ @Nullable public String name();
+
+ /**
+ * Gets index unique flag. If index is unique then exception
+ * will be thrown if key is already present in the index.
+ *
+ * @return Index unique flag.
+ */
+ public boolean unique();
+
+ /**
+ * Returns {@code true} if index supports sorting and therefore can perform range operations.
+ * <p>
+ * Note that sorting happens by value and not by key.
+ *
+ * @return Index sorted flag.
+ */
+ public boolean sorted();
+
+ /**
+ * Gets index policy.
+ *
+ * @return Index policy.
+ */
+ public StreamerIndexPolicy policy();
+
+ /**
+ * @return Number entries in the index.
+ */
+ public int size();
+
+ /**
+ * Gets index entry for given key.
+ *
+ * @param key Key for which to retrieve entry.
+ * @return Entry for given key, or {@code null} if one could not be found.
+ */
+ @Nullable public StreamerIndexEntry<E, K, V> entry(K key);
+
+ /**
+ * Gets read-only collection of entries in the index.
+ * <p>
+ * Returned collection is ordered for sorted indexes.
+ *
+ * @param cnt If 0 then all entries are returned,
+ * if positive, then returned collection contains up to {@code cnt} elements
+ * (in ascending order for sorted indexes),
+ * if negative, then returned collection contains up to {@code |cnt|} elements
+ * (in descending order for sorted indexes and not supported for unsorted indexes).
+ * @return Collection of entries in the index.
+ */
+ public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt);
+
+ /**
+ * Gets read-only set of index keys.
+ * <p>
+ * Returned collection is ordered for sorted indexes.
+ *
+ * @param cnt If 0 then all keys are returned,
+ * if positive, then returned collection contains up to {@code cnt} elements
+ * (in ascending order for sorted indexes),
+ * if negative, then returned collection contains up to {@code |cnt|} elements
+ * (in descending order for sorted indexes and not supported for unsorted indexes).
+ * @return Read-only set of index keys within given position range.
+ */
+ public Set<K> keySet(int cnt);
+
+ /**
+ * Gets read-only collection of index values.
+ * <p>
+ * Returned collection is ordered for sorted indexes.
+ *
+ * @param cnt If 0 then all values are returned,
+ * if positive, then returned collection contains up to {@code cnt} elements
+ * (in ascending order for sorted indexes),
+ * if negative, then returned collection contains up to {@code |cnt|} elements
+ * (in descending order for sorted indexes and not supported for unsorted indexes).
+ * @return Read-only collections of index values.
+ */
+ public Collection<V> values(int cnt);
+
+ /**
+ * Gets read-only collection of index events.
+ * <p>
+ * For sorted indexes events are guaranteed to be grouped by corresponding values, however
+ * the order of the events corresponding to the same value is not defined.
+ *
+ * @param cnt If 0 then all values are returned,
+ * if positive, then returned collection contains up to {@code cnt} elements
+ * (in ascending order of values for sorted indexes),
+ * if negative, then returned collection contains up to {@code |cnt|} elements
+ * (in descending order of values for sorted indexes and not supported for unsorted indexes).
+ * @return Read-only collections of index events.
+ * @throws IllegalStateException If index is not configured to track events.
+ * @see StreamerIndexPolicy
+ */
+ public Collection<E> events(int cnt);
+
+ /**
+ * Gets read-only set of index entries with given value.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @param val Value.
+ * @return Read-only set of index entries with given value.
+ */
+ public Set<StreamerIndexEntry<E, K, V>> entrySet(V val);
+
+ /**
+ * Gets read-only set of index entries within given value range.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @param asc {@code True} for ascending set.
+ * @param fromVal From value, if {@code null}, then start from beginning.
+ * @param toVal To value, if {@code null} then include all entries until the end.
+ * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+ * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+ * @return Read-only set of index entries within given value range.
+ */
+ public Set<StreamerIndexEntry<E, K, V>> entrySet(boolean asc, @Nullable V fromVal, boolean fromIncl,
+ @Nullable V toVal, boolean toIncl);
+
+ /**
+ * Gets read-only set of index keys with given value. Iteration order over
+ * this set has the same order as within index.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @param val Value.
+ * @return Read-only set of index entries with given value.
+ */
+ public Set<K> keySet(V val);
+
+ /**
+ * Gets read-only set of index keys within given value range. Iteration order over
+ * this set has the same order as within index.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @param asc {@code True} for ascending set.
+ * @param fromVal From value, if {@code null}, then start from beginning.
+ * @param toVal To value, if {@code null} then include all entries until the end.
+ * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+ * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+ * @return Read-only set of index entries within given value range.
+ */
+ public Set<K> keySet(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
+
+ /**
+ * Gets read-only collection of index values within given value range. Iteration order over
+ * this collection has the same order as within index.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @param asc {@code True} for ascending set.
+ * @param fromVal From value, if {@code null}, then start from beginning.
+ * @param toVal To value, if {@code null} then include all entries until the end.
+ * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+ * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+ * @return Read-only set of index entries within given value range.
+ */
+ public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
+
+ /**
+ * Gets read-only collection of index events.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @param val From value, if {@code null}, then start from beginning.
+ * @return Read-only set of index entries with given value.
+ */
+ public Collection<E> events(@Nullable V val);
+
+ /**
+ * Gets read-only collection of index events.
+ * <p>
+ * Events are guaranteed to be sorted by corresponding values, however
+ * the order of the events corresponding to the same value is not defined.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @param asc {@code True} for ascending set.
+ * @param fromVal From value, if {@code null}, then start from beginning.
+ * @param toVal To value, if {@code null} then include all entries until the end.
+ * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+ * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+ * @return Read-only set of index entries within given value range.
+ */
+ public Collection<E> events(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
+
+ /**
+ * Gets first entry in the index.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @return First entry in the index, or {@code null} if index is empty.
+ */
+ @Nullable public StreamerIndexEntry<E, K, V> firstEntry();
+
+ /**
+ * Gets last entry in the index.
+ * <p>
+ * This operation is only available for sorted indexes.
+ *
+ * @return Last entry in the index, or {@code null} if index is empty.
+ */
+ @Nullable public StreamerIndexEntry<E, K, V> lastEntry();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java
new file mode 100644
index 0000000..4bbf056
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java
@@ -0,0 +1,49 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Streamer index entry. Individual index entry contains index key, value, and all events
+ * associated with given key.
+ *
+ */
+public interface StreamerIndexEntry<E, K, V> {
+ /**
+ * Gets events associated with given index key and value.
+ * <p>
+ * Events are tracked only if {@link StreamerIndexProvider#getPolicy()}
+ * is set to {@link StreamerIndexPolicy#EVENT_TRACKING_ON} or
+ * {@link StreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}.
+ *
+ * @return Events associated with given index key and value or {@code null} if event tracking is off.
+ */
+ @Nullable public Collection<E> events();
+
+ /**
+ * Gets index entry key.
+ *
+ * @return Index entry key.
+ */
+ public K key();
+
+ /**
+ * Gets index entry value.
+ * <p>
+ * For sorted indexes, the sorting happens based on this value.
+ *
+ * @return Index entry value.
+ */
+ public V value();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java
new file mode 100644
index 0000000..ce1bcb2
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java
@@ -0,0 +1,42 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+/**
+ * Streamer index policy, which defines how events
+ * are tracked within an index.
+ */
+public enum StreamerIndexPolicy {
+ /**
+ * Do not track events.
+ * <p>
+ * Only a value, generated by {@link StreamerIndexUpdater},
+ * will be stored in an index; event objects will be thrown away.
+ */
+ EVENT_TRACKING_OFF,
+
+ /**
+ * Track events.
+ * <p>
+ * All event objects will stored in an index along with the values,
+ * generated by {@link StreamerIndexUpdater}.
+ */
+ EVENT_TRACKING_ON,
+
+ /**
+ * Track events with de-duplication.
+ * <p>
+ * All event objects will stored in an index along with the values,
+ * generated by {@link StreamerIndexUpdater}. For duplicate (equal)
+ * events, only a single event object will be stored, which corresponds
+ * to a first event.
+ */
+ EVENT_TRACKING_ON_DEDUP
+}