You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:20 UTC

[19/32] incubator-ignite git commit: # Renaming

# Renaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f54e7bac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f54e7bac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f54e7bac

Branch: refs/heads/master
Commit: f54e7bac000230d2badfc50d543706d0413e37d6
Parents: cb9aa7a
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:48:38 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:48:38 2014 +0300

----------------------------------------------------------------------
 examples/config/example-streamer.xml            |   6 +-
 .../streaming/StreamingCheckInExample.java      |  22 +-
 .../StreamingPopularNumbersExample.java         |  34 +-
 .../streamer/GridStreamProcessor.java           |   6 +-
 .../gridgain/grid/streamer/StreamerWindow.java  |   6 +-
 .../grid/streamer/index/GridStreamerIndex.java  | 297 ------
 .../streamer/index/GridStreamerIndexEntry.java  |  49 -
 .../streamer/index/GridStreamerIndexPolicy.java |  42 -
 .../index/GridStreamerIndexProvider.java        |  99 --
 .../index/GridStreamerIndexProviderAdapter.java | 788 ---------------
 .../index/GridStreamerIndexProviderMBean.java   |  66 --
 .../index/GridStreamerIndexUpdateSync.java      |  69 --
 .../index/GridStreamerIndexUpdater.java         |  80 --
 .../grid/streamer/index/StreamerIndex.java      | 297 ++++++
 .../grid/streamer/index/StreamerIndexEntry.java |  49 +
 .../streamer/index/StreamerIndexPolicy.java     |  42 +
 .../streamer/index/StreamerIndexProvider.java   |  99 ++
 .../index/StreamerIndexProviderAdapter.java     | 788 +++++++++++++++
 .../index/StreamerIndexProviderMBean.java       |  66 ++
 .../streamer/index/StreamerIndexUpdateSync.java |  69 ++
 .../streamer/index/StreamerIndexUpdater.java    |  80 ++
 .../hash/GridStreamerHashIndexProvider.java     | 493 ----------
 .../index/hash/StreamerHashIndexProvider.java   | 492 ++++++++++
 .../tree/GridStreamerTreeIndexProvider.java     | 946 -------------------
 .../index/tree/StreamerTreeIndexProvider.java   | 945 ++++++++++++++++++
 .../streamer/window/StreamerWindowAdapter.java  |  36 +-
 .../average/spring-streamer-average-base.xml    |   2 +-
 .../GridStreamerLifecycleAwareSelfTest.java     |   6 +-
 .../index/GridStreamerIndexSelfTest.java        | 108 +--
 .../streamer/GridStreamerIndexLoadTest.java     |  16 +-
 .../loadtests/streamer/IndexUpdater.java        |   6 +-
 31 files changed, 3052 insertions(+), 3052 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml
index 8eb7cb7..f105770 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -115,7 +115,7 @@
 
                             <property name="indexes">
                                 <list>
-                                    <bean class="org.gridgain.grid.streamer.index.tree.GridStreamerTreeIndexProvider">
+                                    <bean class="org.gridgain.grid.streamer.index.tree.StreamerTreeIndexProvider">
                                         <property name="updater">
                                             <bean class="org.gridgain.examples.streaming.StreamingPopularNumbersExample$IndexUpdater"/>
                                         </property>
@@ -225,7 +225,7 @@
                                 -->
                                 <property name="indexes">
                                     <list>
-                                        <bean class="org.gridgain.grid.streamer.index.hash.GridStreamerHashIndexProvider">
+                                        <bean class="org.gridgain.grid.streamer.index.hash.StreamerHashIndexProvider">
                                             <property name="unique" value="true"/>
 
                                             <property name="updater">
@@ -241,7 +241,7 @@
 
                                 <property name="indexes">
                                     <list>
-                                        <bean class="org.gridgain.grid.streamer.index.hash.GridStreamerHashIndexProvider">
+                                        <bean class="org.gridgain.grid.streamer.index.hash.StreamerHashIndexProvider">
                                             <property name="unique" value="true"/>
 
                                             <property name="updater">

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
index f10dec1..34802c7 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingCheckInExample.java
@@ -170,14 +170,14 @@ public class StreamingCheckInExample {
 
                                 assert win != null;
 
-                                GridStreamerIndex<LocationInfo, String, Place> idxView = win.index();
+                                StreamerIndex<LocationInfo, String, Place> idxView = win.index();
 
-                                Collection<GridStreamerIndexEntry<LocationInfo, String, Place>> entries =
+                                Collection<StreamerIndexEntry<LocationInfo, String, Place>> entries =
                                     idxView.entries(0);
 
                                 Map<String, Place> ret = new HashMap<>(entries.size(), 1.0f);
 
-                                for (GridStreamerIndexEntry<LocationInfo, String, Place> e : entries)
+                                for (StreamerIndexEntry<LocationInfo, String, Place> e : entries)
                                     ret.put(e.key(), e.value());
 
                                 return ret;
@@ -512,10 +512,10 @@ public class StreamingCheckInExample {
     }
 
     /**
-     * Index updater for check-in events. Updaters are specified for {@link GridStreamerIndexProviderAdapter} in
+     * Index updater for check-in events. Updaters are specified for {@link org.gridgain.grid.streamer.index.StreamerIndexProviderAdapter} in
      * streamer configuration.
      */
-    private static class CheckInEventIndexUpdater implements GridStreamerIndexUpdater<CheckInEvent, String, Location> {
+    private static class CheckInEventIndexUpdater implements StreamerIndexUpdater<CheckInEvent, String, Location> {
         /** {@inheritDoc} */
         @Nullable @Override public String indexKey(CheckInEvent evt) {
             return evt.userName(); // Index key is an event user name.
@@ -529,24 +529,24 @@ public class StreamingCheckInExample {
 
         /** {@inheritDoc} */
         @Nullable @Override public Location onAdded(
-            GridStreamerIndexEntry<CheckInEvent, String, Location> entry,
+            StreamerIndexEntry<CheckInEvent, String, Location> entry,
             CheckInEvent evt) throws GridException {
             throw new AssertionError("onAdded() shouldn't be called on unique index.");
         }
 
         /** {@inheritDoc} */
         @Nullable @Override public Location onRemoved(
-            GridStreamerIndexEntry<CheckInEvent, String, Location> entry,
+            StreamerIndexEntry<CheckInEvent, String, Location> entry,
             CheckInEvent evt) {
             return null;
         }
     }
 
     /**
-     * Index updater for location info. Updaters are specified for {@link GridStreamerIndexProviderAdapter} in
+     * Index updater for location info. Updaters are specified for {@link org.gridgain.grid.streamer.index.StreamerIndexProviderAdapter} in
      * streamer configuration.
      */
-    private static class PlacesIndexUpdater implements GridStreamerIndexUpdater<LocationInfo, String, Place> {
+    private static class PlacesIndexUpdater implements StreamerIndexUpdater<LocationInfo, String, Place> {
         /** {@inheritDoc} */
         @Nullable @Override public String indexKey(LocationInfo info) {
             return info.userName();
@@ -560,14 +560,14 @@ public class StreamingCheckInExample {
 
         /** {@inheritDoc} */
         @Nullable @Override public Place onAdded(
-            GridStreamerIndexEntry<LocationInfo, String, Place> entry,
+            StreamerIndexEntry<LocationInfo, String, Place> entry,
             LocationInfo evt) throws GridException {
             throw new AssertionError("onAdded() shouldn't be called on unique index.");
         }
 
         /** {@inheritDoc} */
         @Nullable @Override public Place onRemoved(
-            GridStreamerIndexEntry<LocationInfo, String, Place> entry,
+            StreamerIndexEntry<LocationInfo, String, Place> entry,
             LocationInfo evt) {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
index 8794ba9..592afd8 100644
--- a/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
+++ b/examples/src/main/java/org/gridgain/examples/streaming/StreamingPopularNumbersExample.java
@@ -49,22 +49,22 @@ public class StreamingPopularNumbersExample {
     private static final int CNT = 10_000_000;
 
     /** Comparator sorting random number entries by number popularity. */
-    private static final Comparator<GridStreamerIndexEntry<Integer, Integer, Long>> CMP =
-        new Comparator<GridStreamerIndexEntry<Integer, Integer, Long>>() {
-            @Override public int compare(GridStreamerIndexEntry<Integer, Integer, Long> e1,
-                GridStreamerIndexEntry<Integer, Integer, Long> e2) {
+    private static final Comparator<StreamerIndexEntry<Integer, Integer, Long>> CMP =
+        new Comparator<StreamerIndexEntry<Integer, Integer, Long>>() {
+            @Override public int compare(StreamerIndexEntry<Integer, Integer, Long> e1,
+                StreamerIndexEntry<Integer, Integer, Long> e2) {
                 return e2.value().compareTo(e1.value());
             }
         };
 
     /** Reducer selecting first POPULAR_NUMBERS_CNT values. */
-    private static class PopularNumbersReducer implements IgniteReducer<Collection<GridStreamerIndexEntry<Integer, Integer, Long>>,
-            Collection<GridStreamerIndexEntry<Integer, Integer, Long>>> {
+    private static class PopularNumbersReducer implements IgniteReducer<Collection<StreamerIndexEntry<Integer, Integer, Long>>,
+            Collection<StreamerIndexEntry<Integer, Integer, Long>>> {
         /** */
-        private final List<GridStreamerIndexEntry<Integer, Integer, Long>> sorted = new ArrayList<>();
+        private final List<StreamerIndexEntry<Integer, Integer, Long>> sorted = new ArrayList<>();
 
         /** {@inheritDoc} */
-        @Override public boolean collect(@Nullable Collection<GridStreamerIndexEntry<Integer, Integer, Long>> col) {
+        @Override public boolean collect(@Nullable Collection<StreamerIndexEntry<Integer, Integer, Long>> col) {
             if (col != null && !col.isEmpty())
                 // Add result from remote node to sorted set.
                 sorted.addAll(col);
@@ -73,7 +73,7 @@ public class StreamingPopularNumbersExample {
         }
 
         /** {@inheritDoc} */
-        @Override public Collection<GridStreamerIndexEntry<Integer, Integer, Long>> reduce() {
+        @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> reduce() {
             Collections.sort(sorted, CMP);
 
             return sorted.subList(0, POPULAR_NUMBERS_CNT < sorted.size() ? POPULAR_NUMBERS_CNT : sorted.size());
@@ -158,13 +158,13 @@ public class StreamingPopularNumbersExample {
                 try {
                     // Send reduce query to all 'popular-numbers' streamers
                     // running on local and remote nodes.
-                    Collection<GridStreamerIndexEntry<Integer, Integer, Long>> col = streamer.context().reduce(
+                    Collection<StreamerIndexEntry<Integer, Integer, Long>> col = streamer.context().reduce(
                         // This closure will execute on remote nodes.
                         new IgniteClosure<StreamerContext,
-                                                                            Collection<GridStreamerIndexEntry<Integer, Integer, Long>>>() {
-                            @Override public Collection<GridStreamerIndexEntry<Integer, Integer, Long>> apply(
+                                                                            Collection<StreamerIndexEntry<Integer, Integer, Long>>>() {
+                            @Override public Collection<StreamerIndexEntry<Integer, Integer, Long>> apply(
                                 StreamerContext ctx) {
-                                GridStreamerIndex<Integer, Integer, Long> view = ctx.<Integer>window().index();
+                                StreamerIndex<Integer, Integer, Long> view = ctx.<Integer>window().index();
 
                                 return view.entries(-1 * POPULAR_NUMBERS_CNT);
                             }
@@ -173,7 +173,7 @@ public class StreamingPopularNumbersExample {
                         // that submitted the query.
                         new PopularNumbersReducer());
 
-                    for (GridStreamerIndexEntry<Integer, Integer, Long> cntr : col)
+                    for (StreamerIndexEntry<Integer, Integer, Long> cntr : col)
                         System.out.printf("%3d=%d\n", cntr.key(), cntr.value());
 
                     System.out.println("----------------");
@@ -219,7 +219,7 @@ public class StreamingPopularNumbersExample {
     /**
      * This class will be set as part of window index configuration.
      */
-    private static class IndexUpdater implements GridStreamerIndexUpdater<Integer, Integer, Long> {
+    private static class IndexUpdater implements StreamerIndexUpdater<Integer, Integer, Long> {
         /** {@inheritDoc} */
         @Override public Integer indexKey(Integer evt) {
             // We use event as index key, so event and key are the same.
@@ -227,12 +227,12 @@ public class StreamingPopularNumbersExample {
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Long onAdded(GridStreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
+        @Nullable @Override public Long onAdded(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
             return entry.value() + 1;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Long onRemoved(GridStreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
+        @Nullable @Override public Long onRemoved(StreamerIndexEntry<Integer, Integer, Long> entry, Integer evt) {
             return entry.value() - 1 == 0 ? null : entry.value() - 1;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
index 194f081..6a2de15 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
@@ -112,14 +112,14 @@ public class GridStreamProcessor extends GridProcessorAdapter {
                 }
 
                 if (win instanceof StreamerWindowAdapter) {
-                    GridStreamerIndexProvider[] idxs = ((StreamerWindowAdapter)win).indexProviders();
+                    StreamerIndexProvider[] idxs = ((StreamerWindowAdapter)win).indexProviders();
 
                     if (idxs != null && idxs.length > 0) {
-                        for (GridStreamerIndexProvider idx : idxs) {
+                        for (StreamerIndexProvider idx : idxs) {
                             try {
                                 mBeans.add(U.registerMBean(mBeanSrv, ctx.gridName(), U.maskName(s.name()),
                                     "Window-" + win.name() + "-index-" + idx.name(), idx,
-                                    GridStreamerIndexProviderMBean.class));
+                                    StreamerIndexProviderMBean.class));
 
                                 if (log.isDebugEnabled())
                                     log.debug("Registered MBean for streamer window index [streamer=" + s.name() +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
index 1a16ff5..6096ccc 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
@@ -50,7 +50,7 @@ public interface StreamerWindow<E> extends Iterable<E> {
      * @param <V> Type of the index value.
      * @return Index with default name.
      */
-     public <K, V> GridStreamerIndex<E, K, V> index();
+     public <K, V> StreamerIndex<E, K, V> index();
 
     /**
      * Gets index by name, if not index with such name was configured then
@@ -61,7 +61,7 @@ public interface StreamerWindow<E> extends Iterable<E> {
      * @param <V> Type of the index value.
      * @return Index with a given name.
      */
-    public <K, V> GridStreamerIndex<E, K, V> index(@Nullable String name);
+    public <K, V> StreamerIndex<E, K, V> index(@Nullable String name);
 
     /**
      * Gets all indexes configured for this window.
@@ -69,7 +69,7 @@ public interface StreamerWindow<E> extends Iterable<E> {
      * @return All indexes configured for this window or empty collection, if no
      *         indexes were configured.
      */
-    public Collection<GridStreamerIndex<E, ?, ?>> indexes();
+    public Collection<StreamerIndex<E, ?, ?>> indexes();
 
     /**
      * Resets window. Usually will clear all events from window.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
deleted file mode 100644
index 618d800..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndex.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * User view on streamer index. Streamer indexes are used for fast look ups into streamer windows.
- * <p>
- * Streamer index can be accessed from {@link org.gridgain.grid.streamer.StreamerWindow} via any of the following methods:
- * <ul>
- * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index()}</li>
- * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index(String)}</li>
- * <li>{@link org.gridgain.grid.streamer.StreamerWindow#indexes()}</li>
- * </ul>
- * <p>
- * Indexes are created and provided for streamer windows by {@link GridStreamerIndexProvider} which is
- * specified in streamer configuration.
- * <h1 class="header">Example of how to use indexes</h1>
- * <p>
- * Stock price events are streamed into the system, the stock price event is an object containing stock symbol and price.
- * We need to get minimum price for GOOG instrument.
- * <p>
- * Here is {@link GridStreamerIndexUpdater} that maintains index values up to date:
- * <pre name="code" class="java">
- * class StockPriceIndexUpdater implements GridStreamerIndexUpdater&lt;StockPriceEvent, String, Double&gt; {
- *     &#64;Nullable &#64;Override public String indexKey(StockPriceEvent evt) {
- *         return evt.getSymbol(); // Symbol is an index key.
- *     }
- *
- *     &#64;Nullable &#64;Override public Double initialValue(StockPriceEvent evt, String key) {
- *         return evt.getPrice(); // Set first event's price as an initial value.
- *     }
- *
- *     &#64;Nullable &#64;Override public Double onAdded(GridStreamerIndexEntry&lt;StockPriceEvent, String, Double&gt; entry,
- *         StockPriceEvent evt) throws GridException {
- *         return Math.min(entry.value(), evt.getPrice()); // Update the minimum on new event.
- *     }
- *
- *     &#64;Nullable &#64;Override
- *     public Double onRemoved(GridStreamerIndexEntry&lt;StockPriceEvent, String, Double&gt; entry, StockPriceEvent evt) {
- *         return entry.value(); // Leave minimum unchanged when event is evicted.
- *     }
- * }
- * </pre>
- * <p>
- * Here is the code that queries minimum price for GOOG instrument using index:
- * <pre name="code" class="java">
- * double minGooglePrice = streamer.context().reduce(
- *     // This closure will execute on remote nodes.
- *     new GridClosure&lt;GridStreamerContext, Double&gt;() {
- *         &#64;Nullable &#64;Override public Double apply(GridStreamerContext ctx) {
- *             GridStreamerIndex&lt;StockPriceEvent, String, Double&gt; minIdx = ctx.&lt;StockPriceEvent&gt;window().index("min-price");
- *
- *             return minIdx.entry("GOOG").value();
- *         }
- *     },
- *     new GridReducer&lt;Double, Double&gt;() {
- *         private double minPrice = Integer.MAX_VALUE;
- *
- *         &#64;Override public boolean collect(Double price) {
- *             minPrice = Math.min(minPrice, price); // Take minimum price from all nodes.
- *
- *             return true;
- *         }
- *
- *         &#64;Override public Double reduce() {
- *             return minPrice;
- *         }
- *     }
- * );
- * </pre>
- */
-public interface GridStreamerIndex<E, K, V> extends Iterable<GridStreamerIndexEntry<E, K, V>> {
-    /**
-     * Index name.
-     *
-     * @return Index name.
-     */
-    @Nullable public String name();
-
-    /**
-     * Gets index unique flag. If index is unique then exception
-     * will be thrown if key is already present in the index.
-     *
-     * @return Index unique flag.
-     */
-    public boolean unique();
-
-    /**
-     * Returns {@code true} if index supports sorting and therefore can perform range operations.
-     * <p>
-     * Note that sorting happens by value and not by key.
-     *
-     * @return Index sorted flag.
-     */
-    public boolean sorted();
-
-    /**
-     * Gets index policy.
-     *
-     * @return Index policy.
-     */
-    public GridStreamerIndexPolicy policy();
-
-    /**
-     * @return Number entries in the index.
-     */
-    public int size();
-
-    /**
-     * Gets index entry for given key.
-     *
-     * @param key Key for which to retrieve entry.
-     * @return Entry for given key, or {@code null} if one could not be found.
-     */
-    @Nullable public GridStreamerIndexEntry<E, K, V> entry(K key);
-
-    /**
-     * Gets read-only collection of entries in the index.
-     * <p>
-     * Returned collection is ordered for sorted indexes.
-     *
-     * @param cnt If 0 then all entries are returned,
-     *      if positive, then returned collection contains up to {@code cnt} elements
-     *      (in ascending order for sorted indexes),
-     *      if negative, then returned collection contains up to {@code |cnt|} elements
-     *      (in descending order for sorted indexes and not supported for unsorted indexes).
-     * @return Collection of entries in the index.
-     */
-    public Collection<GridStreamerIndexEntry<E, K, V>> entries(int cnt);
-
-    /**
-     * Gets read-only set of index keys.
-     * <p>
-     * Returned collection is ordered for sorted indexes.
-     *
-     * @param cnt If 0 then all keys are returned,
-     *      if positive, then returned collection contains up to {@code cnt} elements
-     *      (in ascending order for sorted indexes),
-     *      if negative, then returned collection contains up to {@code |cnt|} elements
-     *      (in descending order for sorted indexes and not supported for unsorted indexes).
-     * @return Read-only set of index keys within given position range.
-     */
-    public Set<K> keySet(int cnt);
-
-    /**
-     * Gets read-only collection of index values.
-     * <p>
-     * Returned collection is ordered for sorted indexes.
-     *
-     * @param cnt If 0 then all values are returned,
-     *      if positive, then returned collection contains up to {@code cnt} elements
-     *      (in ascending order for sorted indexes),
-     *      if negative, then returned collection contains up to {@code |cnt|} elements
-     *      (in descending order for sorted indexes and not supported for unsorted indexes).
-     * @return Read-only collections of index values.
-     */
-    public Collection<V> values(int cnt);
-
-    /**
-     * Gets read-only collection of index events.
-     * <p>
-     * For sorted indexes events are guaranteed to be grouped by corresponding values, however
-     * the order of the events corresponding to the same value is not defined.
-     *
-     * @param cnt If 0 then all values are returned,
-     *      if positive, then returned collection contains up to {@code cnt} elements
-     *      (in ascending order of values for sorted indexes),
-     *      if negative, then returned collection contains up to {@code |cnt|} elements
-     *      (in descending order of values for sorted indexes and not supported for unsorted indexes).
-     * @return Read-only collections of index events.
-     * @throws IllegalStateException If index is not configured to track events.
-     * @see GridStreamerIndexPolicy
-     */
-    public Collection<E> events(int cnt);
-
-    /**
-     * Gets read-only set of index entries with given value.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @param val Value.
-     * @return Read-only set of index entries with given value.
-     */
-    public Set<GridStreamerIndexEntry<E, K, V>> entrySet(V val);
-
-    /**
-     * Gets read-only set of index entries within given value range.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @param asc {@code True} for ascending set.
-     * @param fromVal From value, if {@code null}, then start from beginning.
-     * @param toVal To value, if {@code null} then include all entries until the end.
-     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
-     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
-     * @return Read-only set of index entries within given value range.
-     */
-    public Set<GridStreamerIndexEntry<E, K, V>> entrySet(boolean asc, @Nullable V fromVal, boolean fromIncl,
-        @Nullable V toVal, boolean toIncl);
-
-    /**
-     * Gets read-only set of index keys with given value. Iteration order over
-     * this set has the same order as within index.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @param val Value.
-     * @return Read-only set of index entries with given value.
-     */
-    public Set<K> keySet(V val);
-
-    /**
-     * Gets read-only set of index keys within given value range. Iteration order over
-     * this set has the same order as within index.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @param asc {@code True} for ascending set.
-     * @param fromVal From value, if {@code null}, then start from beginning.
-     * @param toVal To value, if {@code null} then include all entries until the end.
-     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
-     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
-     * @return Read-only set of index entries within given value range.
-     */
-    public Set<K> keySet(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
-
-    /**
-     * Gets read-only collection of index values within given value range. Iteration order over
-     * this collection has the same order as within index.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @param asc {@code True} for ascending set.
-     * @param fromVal From value, if {@code null}, then start from beginning.
-     * @param toVal To value, if {@code null} then include all entries until the end.
-     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
-     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
-     * @return Read-only set of index entries within given value range.
-     */
-    public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
-
-    /**
-     * Gets read-only collection of index events.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @param val From value, if {@code null}, then start from beginning.
-     * @return Read-only set of index entries with given value.
-     */
-    public Collection<E> events(@Nullable V val);
-
-    /**
-     * Gets read-only collection of index events.
-     * <p>
-     * Events are guaranteed to be sorted by corresponding values, however
-     * the order of the events corresponding to the same value is not defined.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @param asc {@code True} for ascending set.
-     * @param fromVal From value, if {@code null}, then start from beginning.
-     * @param toVal To value, if {@code null} then include all entries until the end.
-     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
-     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
-     * @return Read-only set of index entries within given value range.
-     */
-    public Collection<E> events(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
-
-    /**
-     * Gets first entry in the index.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @return First entry in the index, or {@code null} if index is empty.
-     */
-    @Nullable public GridStreamerIndexEntry<E, K, V> firstEntry();
-
-    /**
-     * Gets last entry in the index.
-     * <p>
-     * This operation is only available for sorted indexes.
-     *
-     * @return Last entry in the index, or {@code null} if index is empty.
-     */
-    @Nullable public GridStreamerIndexEntry<E, K, V> lastEntry();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java
deleted file mode 100644
index dc90004..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexEntry.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer index entry. Individual index entry contains index key, value, and all events
- * associated with given key.
- *
- */
-public interface GridStreamerIndexEntry<E, K, V> {
-    /**
-     * Gets events associated with given index key and value.
-     * <p>
-     * Events are tracked only if {@link GridStreamerIndexProvider#getPolicy()}
-     * is set to {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON} or
-     * {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}.
-     *
-     * @return Events associated with given index key and value or {@code null} if event tracking is off.
-     */
-    @Nullable public Collection<E> events();
-
-    /**
-     * Gets index entry key.
-     *
-     * @return Index entry key.
-     */
-    public K key();
-
-    /**
-     * Gets index entry value.
-     * <p>
-     * For sorted indexes, the sorting happens based on this value.
-     *
-     * @return Index entry value.
-     */
-    public V value();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java
deleted file mode 100644
index 5fc7ae7..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexPolicy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-/**
- * Streamer index policy, which defines how events
- * are tracked within an index.
- */
-public enum GridStreamerIndexPolicy {
-    /**
-     * Do not track events.
-     * <p>
-     * Only a value, generated by {@link GridStreamerIndexUpdater},
-     * will be stored in an index; event objects will be thrown away.
-     */
-    EVENT_TRACKING_OFF,
-
-    /**
-     * Track events.
-     * <p>
-     * All event objects will stored in an index along with the values,
-     * generated by {@link GridStreamerIndexUpdater}.
-     */
-    EVENT_TRACKING_ON,
-
-    /**
-     * Track events with de-duplication.
-     * <p>
-     * All event objects will stored in an index along with the values,
-     * generated by {@link GridStreamerIndexUpdater}. For duplicate (equal)
-     * events, only a single event object will be stored, which corresponds
-     * to a first event.
-     */
-    EVENT_TRACKING_ON_DEDUP
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
deleted file mode 100644
index ebcd610..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.gridgain.grid.*;
-
-/**
- * Represents an actual instance of an index. Used by a {@link org.gridgain.grid.streamer.StreamerWindow}
- * to perform event indexing.
- * <p>
- * To configure index for a streamer window, use
- * {@link org.gridgain.grid.streamer.window.StreamerWindowAdapter#setIndexes(GridStreamerIndexProvider[])}.
- */
-public interface GridStreamerIndexProvider<E, K, V> extends GridStreamerIndexProviderMBean {
-    /**
-     * Gets index name.
-     *
-     * @return Name of the index.
-     */
-    public String getName();
-
-    /**
-     * Gets user view for this index. This view is a snapshot
-     * of a current index state. Once returned, it does not
-     * change over time.
-     *
-     * @return User view for this index.
-     */
-    public GridStreamerIndex<E, K, V> index();
-
-    /**
-     * Initializes the index.
-     */
-    public void initialize();
-
-    /**
-     * Resets the index to an initial empty state.
-     */
-    public void reset();
-
-    /**
-     * Disposes the index.
-     */
-    public void dispose();
-
-    /**
-     * Adds an event to index.
-     *
-     * @param sync Index update synchronizer.
-     * @param evt Event to add to an index.
-     * @throws GridException If failed to add event to an index.
-     */
-    public void add(GridStreamerIndexUpdateSync sync, E evt) throws GridException;
-
-    /**
-     * Removes an event from index.
-     *
-     * @param sync Index update synchronizer.
-     * @param evt Event to remove from index.
-     * @throws GridException If failed to add event to an index.
-     */
-    public void remove(GridStreamerIndexUpdateSync sync, E evt) throws GridException;
-
-    /**
-     * Gets event indexing policy, which defines how events
-     * are tracked within an index.
-     *
-     * @return index policy.
-     */
-    public GridStreamerIndexPolicy getPolicy();
-
-    /**
-     * Checks whether this index is unique or not. If it is, equal events
-     * are not allowed, which means that if a newly-added event is found
-     * to be equal to one of the already present events
-     * ({@link Object#equals(Object)} returns {@code true}), an exception
-     * is thrown.
-     *
-     * @return {@code True} for unique index.
-     */
-    public boolean isUnique();
-
-    /**
-     * Finalizes an update operation.
-     *
-     * @param sync Index update synchronizer.
-     * @param evt Updated event.
-     * @param rollback Rollback flag. If {@code true}, a rollback was made.
-     * @param rmv Remove flag. If {@code true}, the event was removed from index.
-     */
-    public void endUpdate(GridStreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java
deleted file mode 100644
index f9b7bcd..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderAdapter.java
+++ /dev/null
@@ -1,788 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import com.romix.scala.*;
-import com.romix.scala.collection.concurrent.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-import org.pcollections.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.gridgain.grid.streamer.index.GridStreamerIndexPolicy.*;
-
-/**
- * Convenient {@link GridStreamerIndexProvider} adapter implementing base configuration methods.
- */
-public abstract class GridStreamerIndexProviderAdapter<E, K, V> implements GridStreamerIndexProvider<E, K, V> {
-    /** */
-    protected final IgniteClosure<GridStreamerIndexEntry<E, K, V>, V> entryToVal =
-        new C1<GridStreamerIndexEntry<E, K, V>, V>() {
-            @Override public V apply(GridStreamerIndexEntry<E, K, V> e) {
-                return e.value();
-            }
-        };
-
-    /** */
-    protected final IgniteClosure<GridStreamerIndexEntry<E, K, V>, K> entryToKey =
-        new C1<GridStreamerIndexEntry<E, K, V>, K>() {
-            @Override public K apply(GridStreamerIndexEntry<E, K, V> e) {
-                return e.key();
-            }
-        };
-
-    /** Keys currently being updated. */
-    private final ConcurrentMap<K, GridStreamerIndexUpdateSync> locks = new ConcurrentHashMap8<>();
-
-    /** Index name. */
-    private String name;
-
-    /** Index policy. */
-    private GridStreamerIndexPolicy plc = EVENT_TRACKING_OFF;
-
-    /** Index updater. */
-    private GridStreamerIndexUpdater<E, K, V> updater;
-
-    /** */
-    private final LongAdder evtsCnt = new LongAdder();
-
-    /** Read write lock. */
-    private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
-
-    /** */
-    private boolean unique;
-
-    /** */
-    private final ThreadLocal<K> threadLocKey = new ThreadLocal<>();
-
-    /** */
-    private final ConcurrentMap<IndexKey<V>, GridStreamerIndexUpdateSync> idxLocks = new ConcurrentHashMap8<>();
-
-    /** */
-    private boolean keyCheck = true;
-
-    /**
-     * Sets index name.
-     *
-     * @param name Index name.
-     */
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String getName() {
-        return name;
-    }
-
-    /**
-     * Sets index policy.
-     *
-     * @param plc Policy.
-     */
-    public void setPolicy(GridStreamerIndexPolicy plc) {
-        this.plc = plc;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridStreamerIndexPolicy getPolicy() {
-        return plc;
-    }
-
-    /**
-     * Sets unique flag.
-     *
-     * @param unique {@code True} for unique index.
-     */
-    public void setUnique(boolean unique) {
-        this.unique = unique;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isUnique() {
-        return unique;
-    }
-
-    /**
-     * Sets index updater.
-     *
-     * @param updater Updater.
-     */
-    public void setUpdater(GridStreamerIndexUpdater<E, K, V> updater) {
-        this.updater = updater;
-    }
-
-    /**
-     * Gets index updater.
-     *
-     * @return Updater.
-     */
-    public GridStreamerIndexUpdater<E, K, V> getUpdater() {
-        return updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void dispose() {
-        // No-op.
-    }
-
-    /**
-     * Add event to the index.
-     *
-     * @param sync Sync.
-     * @param evt Event.
-     */
-    @Override public void add(GridStreamerIndexUpdateSync sync, E evt) throws GridException {
-        assert evt != null;
-
-        if (threadLocKey.get() != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get());
-
-        K key = updater.indexKey(evt);
-
-        if (key == null)
-            return; // Ignore event.
-
-        validateIndexKey(key);
-
-        readLock();
-
-        threadLocKey.set(key);
-
-        lockKey(key, sync);
-
-        add(evt, key, sync);
-    }
-
-    /**
-     * Remove event from the index.
-     *
-     * @param sync Sync.
-     * @param evt Event.
-     */
-    @Override public void remove(GridStreamerIndexUpdateSync sync, E evt) throws GridException {
-        assert evt != null;
-
-        if (threadLocKey.get() != null)
-            throw new IllegalStateException("Previous operation has not been finished: " + threadLocKey.get());
-
-        K key = updater.indexKey(evt);
-
-        assert key != null;
-
-        validateIndexKey(key);
-
-        readLock();
-
-        threadLocKey.set(key);
-
-        lockKey(key, sync);
-
-        remove(evt, key, sync);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void endUpdate(GridStreamerIndexUpdateSync sync, E evt, boolean rollback, boolean rmv) {
-        K key = threadLocKey.get();
-
-        if (key == null)
-            return;
-
-        if (!rollback) {
-            if (rmv)
-                evtsCnt.decrement();
-            else
-                evtsCnt.increment();
-        }
-
-        threadLocKey.remove();
-
-        endUpdate0(sync, evt, key, rollback);
-
-        unlockKey(key, sync);
-
-        readUnlock();
-    }
-
-    /**
-     * @param sync Sync.
-     * @param evt Event.
-     * @param key Key.
-     * @param rollback Rollback flag.
-     */
-    protected abstract void endUpdate0(GridStreamerIndexUpdateSync sync, E evt, K key, boolean rollback);
-
-    /** {@inheritDoc} */
-    @Override public void reset() {
-        writeLock();
-
-        try {
-            reset0();
-        }
-        finally {
-            writeUnlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridStreamerIndex<E, K, V> index() {
-        writeLock();
-
-        try {
-            return index0();
-        }
-        finally {
-            writeUnlock();
-        }
-    }
-
-    /**
-     * Called on reset.
-     */
-    protected abstract void reset0();
-
-    /**
-     * @return Index
-     */
-    protected abstract GridStreamerIndex<E, K, V> index0();
-
-    /**
-     *
-     */
-    protected void readLock() {
-        rwLock.readLock();
-    }
-
-    /**
-     *
-     */
-    protected void readUnlock() {
-        rwLock.readUnlock();
-    }
-
-    /**
-     *
-     */
-    protected void writeLock() {
-        rwLock.writeLock();
-    }
-
-    /**
-     *
-     */
-    protected void writeUnlock() {
-        rwLock.writeUnlock();
-    }
-
-    /**
-     * @return Events count,
-     */
-    protected int eventsCount() {
-        return evtsCnt.intValue();
-    }
-
-    /**
-     * Add event to the index.
-     *
-     * @param evt Event.
-     * @param key key.
-     * @param sync Sync.
-     * @throws GridException If failed.
-     */
-    protected abstract void add(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException;
-
-    /**
-     * Remove event from the index.
-     *
-     * @param evt Event.
-     * @param key Key.
-     * @param sync Sync.
-     * @throws GridException If failed.
-     */
-    protected abstract void remove(E evt, K key, GridStreamerIndexUpdateSync sync) throws GridException;
-
-    /**
-     * Lock updates on particular key.
-     *
-     * @param key Key.
-     * @param sync Sync.
-     * @throws GridException If failed.
-     */
-    private void lockKey(K key, GridStreamerIndexUpdateSync sync) throws GridException {
-        assert key != null;
-        assert sync != null;
-
-        while (true) {
-            GridStreamerIndexUpdateSync old = locks.putIfAbsent(key, sync);
-
-            if (old != null) {
-                try {
-                    old.await();
-                }
-                catch (InterruptedException e) {
-                    throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e);
-                }
-
-                // No point to replace or remove sync here.
-                // Owner will first remove it, then will finish the sync.
-            }
-            else
-                break;
-        }
-    }
-
-    /**
-     * Unlock updates on particular key.
-     *
-     * @param key Key.
-     * @param sync Sync.
-     */
-    private void unlockKey(K key, GridStreamerIndexUpdateSync sync) {
-        assert key != null;
-
-        locks.remove(key, sync);
-    }
-
-    /**
-     * Lock updates on particular key.
-     *
-     * @param key Key.
-     * @param sync Sync.
-     * @throws GridException If failed.
-     */
-    protected void lockIndexKey(IndexKey<V> key, GridStreamerIndexUpdateSync sync) throws GridException {
-        assert key != null;
-        assert sync != null;
-        assert isUnique();
-
-        while (true) {
-            GridStreamerIndexUpdateSync old = idxLocks.putIfAbsent(key, sync);
-
-            if (old != null) {
-                try {
-                    old.await();
-                }
-                catch (InterruptedException e) {
-                    throw new GridException("Failed to lock on key (thread has been interrupted): " + key, e);
-                }
-
-                // No point to replace or remove sync here.
-                // Owner will first remove it, then will finish the sync.
-            }
-            else
-                break;
-        }
-    }
-
-    /**
-     * Unlock updates on particular key.
-     *
-     * @param key Key.
-     * @param sync Sync.
-     */
-    protected void unlockIndexKey(IndexKey<V> key, GridStreamerIndexUpdateSync sync) {
-        assert key != null;
-        assert isUnique();
-
-        idxLocks.remove(key, sync);
-    }
-
-    /**
-     * @param key Key,
-     * @param val Value.
-     * @param idxKey Index key.
-     * @param evt Event.
-     * @return Entry.
-     */
-    protected Entry<E, K, V> newEntry(K key, V val, @Nullable IndexKey<V> idxKey, E evt) {
-        GridStreamerIndexPolicy plc = getPolicy();
-
-        switch (plc) {
-            case EVENT_TRACKING_OFF:
-                return new NonTrackingEntry<>(key, val, idxKey);
-
-            case EVENT_TRACKING_ON:
-                return new EventTrackingEntry<>(addToCollection(null, evt), key, val, idxKey);
-
-            default:
-                assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
-
-                return new DedupTrackingEntry<>(addToMap(null, evt), key, val, idxKey);
-        }
-    }
-
-    /**
-     * @param oldEntry Old entry.
-     * @param key Key,
-     * @param val Value.
-     * @param idxKey Index key.
-     * @param evt Event.
-     * @return Entry.
-     */
-    protected Entry<E, K, V> addEvent(GridStreamerIndexEntry<E,K,V> oldEntry, K key, V val,
-        @Nullable IndexKey<V> idxKey, E evt) {
-        GridStreamerIndexPolicy plc = getPolicy();
-
-        switch (plc) {
-            case EVENT_TRACKING_OFF:
-                return new NonTrackingEntry<>(key, val, idxKey);
-
-            case EVENT_TRACKING_ON:
-                return new EventTrackingEntry<>(addToCollection(oldEntry.events(), evt), key, val, idxKey);
-
-            default:
-                assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
-
-                return new DedupTrackingEntry<>(addToMap(((DedupTrackingEntry<E, K, V>)oldEntry).rawEvents(), evt),
-                    key, val, idxKey);
-        }
-    }
-
-    /**
-     * @param oldEntry Old entry.
-     * @param key Key,
-     * @param val Value.
-     * @param idxKey Index key.
-     * @param evt Event.
-     * @return Entry.
-     */
-    protected Entry<E, K, V> removeEvent(GridStreamerIndexEntry<E, K, V> oldEntry, K key, V val,
-        @Nullable IndexKey<V> idxKey, E evt) {
-        GridStreamerIndexPolicy plc = getPolicy();
-
-        switch (plc) {
-            case EVENT_TRACKING_OFF:
-                return new NonTrackingEntry<>(key, val, idxKey);
-
-            case EVENT_TRACKING_ON:
-                Collection<E> oldEvts = oldEntry.events();
-
-                assert oldEvts != null; // Event tracking is on.
-
-                Collection<E> newEvts = removeFromCollection(oldEvts, evt);
-
-                return new EventTrackingEntry<>(newEvts != null ? newEvts : oldEvts, key, val, idxKey);
-
-            default:
-                assert plc == EVENT_TRACKING_ON_DEDUP : "Unknown policy: " + plc;
-
-                Map<E, Integer> oldMap = ((DedupTrackingEntry<E, K, V>)oldEntry).rawEvents();
-
-                assert oldMap != null; // Event tracking is on.
-
-                Map<E, Integer> newMap = removeFromMap(oldMap, evt);
-
-                return new DedupTrackingEntry<>(newMap != null ? newMap : oldMap, key, val, idxKey);
-        }
-    }
-
-    /**
-     * @param col Collection.
-     * @param evt Event.
-     * @return Cloned collection.
-     */
-    protected static <E> Collection<E> addToCollection(@Nullable Collection<E> col, E evt) {
-        PVector<E> res = col == null ? TreePVector.<E>empty() : (PVector<E>)col;
-
-        return res.plus(evt);
-    }
-
-    /**
-     * @param map Collection.
-     * @param evt Event.
-     * @return Cloned map.
-     */
-    protected static <E> Map<E, Integer> addToMap(@Nullable Map<E, Integer> map, E evt) {
-        HashPMap<E, Integer> res = map == null ? HashTreePMap.<E, Integer>empty() : (HashPMap<E, Integer>)map;
-
-        Integer cnt = res.get(evt);
-
-        return cnt != null ? res.minus(evt).plus(evt, cnt + 1) : res.plus(evt, 1);
-    }
-
-    /**
-     * @param col Collection.
-     * @param evt Event.
-     * @return Cloned collection.
-     */
-    @Nullable protected static <E> Collection<E> removeFromCollection(@Nullable Collection<E> col, E evt) {
-        if (col == null)
-            return null;
-
-        PVector<E> res = (PVector<E>)col;
-
-        res = res.minus(evt);
-
-        return res.isEmpty() ? null : res;
-    }
-
-    /**
-     * @param map Collection.
-     * @param evt Event.
-     * @return Cloned map.
-     */
-    @Nullable protected static <E> Map<E, Integer> removeFromMap(@Nullable Map<E, Integer> map, E evt) {
-        if (map == null)
-            return null;
-
-        HashPMap<E, Integer> res = (HashPMap<E, Integer>)map;
-
-        Integer cnt = res.get(evt);
-
-        return cnt == null ? res : cnt == 1 ? res.minus(evt) : res.minus(evt).plus(evt, cnt - 1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String name() {
-        return name;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String updaterClass() {
-        return updater.getClass().getName();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean unique() {
-        return unique;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridStreamerIndexPolicy policy() {
-        return plc;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return index0().size();
-    }
-
-    /**
-     * Validates that given index key has overridden equals and hashCode methods.
-     *
-     * @param key Index key.
-     * @throws IllegalArgumentException If validation fails.
-     */
-    private void validateIndexKey(@Nullable Object key) {
-        if (keyCheck) {
-            keyCheck = false;
-
-            if (key == null)
-                return;
-
-            if (!U.overridesEqualsAndHashCode(key))
-                throw new IllegalArgumentException("Index key must override hashCode() and equals() methods: " +
-                    key.getClass().getName());
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerIndexProviderAdapter.class, this);
-    }
-
-    /**
-     * Streamer window index key.
-     */
-    protected interface IndexKey<V> {
-        /**
-         * @return Value associated with this key.
-         */
-        public V value();
-    }
-
-    /**
-     * Utility method to safely get values from TrieMap.
-     * See: https://github.com/romix/java-concurrent-hash-trie-map/issues/4
-     *
-     * @param key Key.
-     * @param map Trie map.
-     * @return Value from map.
-     */
-    @SuppressWarnings({"IfMayBeConditional", "TypeMayBeWeakened"})
-    protected static <K, V> V trieGet(K key, TrieMap<K, V> map) {
-        Object r = map.get(key);
-
-        if(r instanceof Some)
-            return ((Some<V>)r).get ();
-        else if(r instanceof None)
-            return null;
-        else
-            return (V)r;
-    }
-
-    /**
-     * Streamer window index entry.
-     */
-    protected abstract static class Entry<E, K, V> implements GridStreamerIndexEntry<E, K, V> {
-        /** */
-        private final K key;
-
-        /** */
-        private final V val;
-
-        /** */
-        private final IndexKey<V> idxKey;
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         * @param idxKey Key index.
-         */
-        Entry(K key, V val, @Nullable IndexKey<V> idxKey) {
-            assert key != null;
-            assert val != null;
-            assert idxKey == null || idxKey.value() == val : "Keys are invalid [idxKey=" + idxKey + ", val=" + val +']';
-
-            this.key = key;
-            this.val = val;
-            this.idxKey = idxKey;
-        }
-
-        /** {@inheritDoc} */
-        @Override public K key() {
-            return key;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V value() {
-            return val;
-        }
-
-        /**
-         * @return Internal key.
-         */
-        @Nullable public IndexKey<V> keyIndex() {
-            return idxKey;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (!(obj instanceof Entry))
-                return false;
-
-            GridStreamerIndexEntry<E, K, V> e = (GridStreamerIndexEntry<E, K, V>)obj;
-
-            return key.equals(e.key());
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return key.hashCode();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(Entry.class, this, "identity", System.identityHashCode(this));
-        }
-    }
-
-    /**
-     * Entry with index policy {@link GridStreamerIndexPolicy#EVENT_TRACKING_OFF}.
-     */
-    protected static class NonTrackingEntry<E, K, V> extends Entry<E, K, V> {
-        /**
-         * @param key Key.
-         * @param val Value.
-         * @param idxKey Key index.
-         */
-        public NonTrackingEntry(K key, V val, @Nullable IndexKey<V> idxKey) {
-            super(key, val, idxKey);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(NonTrackingEntry.class, this, super.toString());
-        }
-    }
-
-    /**
-     * Entry with index policy {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON}.
-     */
-    protected static class EventTrackingEntry<E, K, V> extends Entry<E, K, V> {
-        /** */
-        private final Collection<E> evts;
-
-        /**
-         * @param evts Events.
-         * @param key Key.
-         * @param val Value.
-         * @param idxKey Key index.
-         */
-        public EventTrackingEntry(Collection<E> evts, K key, V val, @Nullable IndexKey<V> idxKey) {
-            super(key, val, idxKey);
-
-            assert evts == null || !evts.isEmpty() : "Invalid events: " + evts;
-
-            this.evts = evts;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events() {
-            return evts;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(EventTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString());
-        }
-    }
-
-    /**
-     * Entry with index policy {@link GridStreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}.
-     */
-    protected static class DedupTrackingEntry<E, K, V> extends Entry<E, K, V> {
-        /** */
-        private final Map<E, Integer> evts;
-
-        /**
-         * @param evts Events.
-         * @param key Key.
-         * @param val Value.
-         * @param idxKey Key index.
-         */
-        public DedupTrackingEntry(Map<E, Integer> evts, K key, V val, @Nullable IndexKey<V> idxKey) {
-            super(key, val, idxKey);
-
-            assert evts == null || !evts.isEmpty() : "Invalid events: " + evts;
-
-            this.evts = evts;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Collection<E> events() {
-            return Collections.unmodifiableSet(evts.keySet());
-        }
-
-        /**
-         * @return Events.
-         */
-        @Nullable public Map<E, Integer> rawEvents() {
-            return evts;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(DedupTrackingEntry.class, this, "evtCnt", evts.size(), "parent", super.toString());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java
deleted file mode 100644
index 5952673..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProviderMBean.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.apache.ignite.mbean.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Streamer window index provider MBean.
- */
-public interface GridStreamerIndexProviderMBean {
-    /**
-     * Index name.
-     *
-     * @return Index name.
-     */
-    @IgniteMBeanDescription("Index name.")
-    @Nullable public String name();
-
-    /**
-     * Gets index updater class name.
-     *
-     * @return Index updater class.
-     */
-    @IgniteMBeanDescription("Index updater class name.")
-    public String updaterClass();
-
-    /**
-     * Gets index unique flag.
-     *
-     * @return Index unique flag.
-     */
-    @IgniteMBeanDescription("Index unique flag.")
-    public boolean unique();
-
-    /**
-     * Returns {@code true} if index supports sorting and therefore can perform range operations.
-     *
-     * @return Index sorted flag.
-     */
-    @IgniteMBeanDescription("Index sorted flag.")
-    public boolean sorted();
-
-    /**
-     * Gets index policy.
-     *
-     * @return Index policy.
-     */
-    @IgniteMBeanDescription("Index policy.")
-    public GridStreamerIndexPolicy policy();
-
-    /**
-     * Gets current index size.
-     *
-     * @return Current index size.
-     */
-    @IgniteMBeanDescription("Current index size.")
-    public int size();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java
deleted file mode 100644
index 1f708d0..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdateSync.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Streamer index update synchronizer.
- * <p>
- * Used in {@link GridStreamerIndexProvider} to synchronize
- * operations on window index.
- *
- * @see GridStreamerIndexProvider
- *
- */
-public class GridStreamerIndexUpdateSync {
-    /** */
-    private volatile int res;
-
-    /**
-     * Waits for a notification from another thread, which
-     * should call {@link #finish(int)} with an operation result.
-     * That result is returned by this method.
-     *
-     * @return Operation results, passed to {@link #finish(int)}.
-     * @throws InterruptedException If wait was interrupted.
-     */
-    public int await() throws InterruptedException {
-        int res0 = res;
-
-        if (res0 == 0) {
-            synchronized (this) {
-                while ((res0 = res) == 0)
-                    wait();
-            }
-        }
-
-        assert res0 != 0;
-
-        return res0;
-    }
-
-    /**
-     * Notifies all waiting threads to finish waiting.
-     *
-     * @param res Operation result to return from {@link #await()}.
-     */
-    public void finish(int res) {
-        assert res != 0;
-
-        synchronized (this) {
-            this.res = res;
-
-            notifyAll();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridStreamerIndexUpdateSync.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java
deleted file mode 100644
index 5f76bf6..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexUpdater.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.streamer.index;
-
-import org.gridgain.grid.*;
-import org.jetbrains.annotations.*;
-
-/**
- * Index updater. The main responsibility of index updater is to maintain index values
- * up to date whenever events are added or removed from window.
- * <p>
- * Updater is provided to index provider in configuration usually via
- * {@link GridStreamerIndexProviderAdapter#setUpdater(GridStreamerIndexUpdater)} method.
- */
-public interface GridStreamerIndexUpdater<E, K, V> {
-    /**
-     * Given an event, extract index key. For example, if you have a 'Person' object
-     * with field 'age' and need to index based on this field, then this method
-     * should return the value of age field.
-     * <p>
-     * If {@code null} is returned then event will be ignored by the index.
-     *
-     * @param evt Event being added or removed from the window.
-     * @return Index key for this event.
-     */
-    @Nullable public K indexKey(E evt);
-
-    /**
-     * Gets initial value for the index or {@code null} if event should be ignored.
-     * This method is called every time when an entry is added to the window in
-     * order to get initial value for given key.
-     *
-     * @param evt Event being added to or removed from window.
-     * @param key Index key return by {@link #indexKey(Object)} method.
-     * @return Initial value for given key, if {@code null} then event will be
-     *      ignored and index entry will not be created.
-     */
-    @Nullable public V initialValue(E evt, K key);
-
-    /**
-     * Callback invoked whenever an event is being added to the window. Given a key and
-     * a current index value for this key, the implementation should return the new
-     * value for this key. If returned value is {@code null}, then current entry will
-     * be removed from the index.
-     * <p>
-     * If index is sorted, then sorting happens based on the returned value.
-     *
-     * @param entry Current index entry.
-     * @param evt New event.
-     * @return New index value for given key, if {@code null}, then current
-     *      index entry will be removed the index.
-     * @throws GridException If entry should not be added to index (e.g. if uniqueness is violated).
-     */
-    @Nullable public V onAdded(GridStreamerIndexEntry<E, K, V> entry, E evt) throws GridException;
-
-    /**
-     * Callback invoked whenever an event is being removed from the window and has
-     * index entry for given key. If there was no entry for given key, then
-     * {@code onRemoved()} will not be called.
-     * <p>
-     * Given a key and a current index value for this key, the implementation should return the new
-     * value for this key. If returned value is {@code null}, then current entry will
-     * be removed from the index.
-     * <p>
-     * If index is sorted, then sorting happens based on the returned value.
-     *
-     * @param entry Current index entry.
-     * @param evt Event being removed from the window.
-     * @return New index value for given key, if {@code null}, then current
-     *      index entry will be removed the index.
-     */
-    @Nullable public V onRemoved(GridStreamerIndexEntry<E, K, V> entry, E evt);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java
new file mode 100644
index 0000000..5f069cb
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndex.java
@@ -0,0 +1,297 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * User view on streamer index. Streamer indexes are used for fast look ups into streamer windows.
+ * <p>
+ * Streamer index can be accessed from {@link org.gridgain.grid.streamer.StreamerWindow} via any of the following methods:
+ * <ul>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index()}</li>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#index(String)}</li>
+ * <li>{@link org.gridgain.grid.streamer.StreamerWindow#indexes()}</li>
+ * </ul>
+ * <p>
+ * Indexes are created and provided for streamer windows by {@link StreamerIndexProvider} which is
+ * specified in streamer configuration.
+ * <h1 class="header">Example of how to use indexes</h1>
+ * <p>
+ * Stock price events are streamed into the system, the stock price event is an object containing stock symbol and price.
+ * We need to get minimum price for GOOG instrument.
+ * <p>
+ * Here is {@link StreamerIndexUpdater} that maintains index values up to date:
+ * <pre name="code" class="java">
+ * class StockPriceIndexUpdater implements GridStreamerIndexUpdater&lt;StockPriceEvent, String, Double&gt; {
+ *     &#64;Nullable &#64;Override public String indexKey(StockPriceEvent evt) {
+ *         return evt.getSymbol(); // Symbol is an index key.
+ *     }
+ *
+ *     &#64;Nullable &#64;Override public Double initialValue(StockPriceEvent evt, String key) {
+ *         return evt.getPrice(); // Set first event's price as an initial value.
+ *     }
+ *
+ *     &#64;Nullable &#64;Override public Double onAdded(GridStreamerIndexEntry&lt;StockPriceEvent, String, Double&gt; entry,
+ *         StockPriceEvent evt) throws GridException {
+ *         return Math.min(entry.value(), evt.getPrice()); // Update the minimum on new event.
+ *     }
+ *
+ *     &#64;Nullable &#64;Override
+ *     public Double onRemoved(GridStreamerIndexEntry&lt;StockPriceEvent, String, Double&gt; entry, StockPriceEvent evt) {
+ *         return entry.value(); // Leave minimum unchanged when event is evicted.
+ *     }
+ * }
+ * </pre>
+ * <p>
+ * Here is the code that queries minimum price for GOOG instrument using index:
+ * <pre name="code" class="java">
+ * double minGooglePrice = streamer.context().reduce(
+ *     // This closure will execute on remote nodes.
+ *     new GridClosure&lt;GridStreamerContext, Double&gt;() {
+ *         &#64;Nullable &#64;Override public Double apply(GridStreamerContext ctx) {
+ *             GridStreamerIndex&lt;StockPriceEvent, String, Double&gt; minIdx = ctx.&lt;StockPriceEvent&gt;window().index("min-price");
+ *
+ *             return minIdx.entry("GOOG").value();
+ *         }
+ *     },
+ *     new GridReducer&lt;Double, Double&gt;() {
+ *         private double minPrice = Integer.MAX_VALUE;
+ *
+ *         &#64;Override public boolean collect(Double price) {
+ *             minPrice = Math.min(minPrice, price); // Take minimum price from all nodes.
+ *
+ *             return true;
+ *         }
+ *
+ *         &#64;Override public Double reduce() {
+ *             return minPrice;
+ *         }
+ *     }
+ * );
+ * </pre>
+ */
+public interface StreamerIndex<E, K, V> extends Iterable<StreamerIndexEntry<E, K, V>> {
+    /**
+     * Index name.
+     *
+     * @return Index name.
+     */
+    @Nullable public String name();
+
+    /**
+     * Gets index unique flag. If index is unique then exception
+     * will be thrown if key is already present in the index.
+     *
+     * @return Index unique flag.
+     */
+    public boolean unique();
+
+    /**
+     * Returns {@code true} if index supports sorting and therefore can perform range operations.
+     * <p>
+     * Note that sorting happens by value and not by key.
+     *
+     * @return Index sorted flag.
+     */
+    public boolean sorted();
+
+    /**
+     * Gets index policy.
+     *
+     * @return Index policy.
+     */
+    public StreamerIndexPolicy policy();
+
+    /**
+     * @return Number entries in the index.
+     */
+    public int size();
+
+    /**
+     * Gets index entry for given key.
+     *
+     * @param key Key for which to retrieve entry.
+     * @return Entry for given key, or {@code null} if one could not be found.
+     */
+    @Nullable public StreamerIndexEntry<E, K, V> entry(K key);
+
+    /**
+     * Gets read-only collection of entries in the index.
+     * <p>
+     * Returned collection is ordered for sorted indexes.
+     *
+     * @param cnt If 0 then all entries are returned,
+     *      if positive, then returned collection contains up to {@code cnt} elements
+     *      (in ascending order for sorted indexes),
+     *      if negative, then returned collection contains up to {@code |cnt|} elements
+     *      (in descending order for sorted indexes and not supported for unsorted indexes).
+     * @return Collection of entries in the index.
+     */
+    public Collection<StreamerIndexEntry<E, K, V>> entries(int cnt);
+
+    /**
+     * Gets read-only set of index keys.
+     * <p>
+     * Returned collection is ordered for sorted indexes.
+     *
+     * @param cnt If 0 then all keys are returned,
+     *      if positive, then returned collection contains up to {@code cnt} elements
+     *      (in ascending order for sorted indexes),
+     *      if negative, then returned collection contains up to {@code |cnt|} elements
+     *      (in descending order for sorted indexes and not supported for unsorted indexes).
+     * @return Read-only set of index keys within given position range.
+     */
+    public Set<K> keySet(int cnt);
+
+    /**
+     * Gets read-only collection of index values.
+     * <p>
+     * Returned collection is ordered for sorted indexes.
+     *
+     * @param cnt If 0 then all values are returned,
+     *      if positive, then returned collection contains up to {@code cnt} elements
+     *      (in ascending order for sorted indexes),
+     *      if negative, then returned collection contains up to {@code |cnt|} elements
+     *      (in descending order for sorted indexes and not supported for unsorted indexes).
+     * @return Read-only collections of index values.
+     */
+    public Collection<V> values(int cnt);
+
+    /**
+     * Gets read-only collection of index events.
+     * <p>
+     * For sorted indexes events are guaranteed to be grouped by corresponding values, however
+     * the order of the events corresponding to the same value is not defined.
+     *
+     * @param cnt If 0 then all values are returned,
+     *      if positive, then returned collection contains up to {@code cnt} elements
+     *      (in ascending order of values for sorted indexes),
+     *      if negative, then returned collection contains up to {@code |cnt|} elements
+     *      (in descending order of values for sorted indexes and not supported for unsorted indexes).
+     * @return Read-only collections of index events.
+     * @throws IllegalStateException If index is not configured to track events.
+     * @see StreamerIndexPolicy
+     */
+    public Collection<E> events(int cnt);
+
+    /**
+     * Gets read-only set of index entries with given value.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @param val Value.
+     * @return Read-only set of index entries with given value.
+     */
+    public Set<StreamerIndexEntry<E, K, V>> entrySet(V val);
+
+    /**
+     * Gets read-only set of index entries within given value range.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @param asc {@code True} for ascending set.
+     * @param fromVal From value, if {@code null}, then start from beginning.
+     * @param toVal To value, if {@code null} then include all entries until the end.
+     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+     * @return Read-only set of index entries within given value range.
+     */
+    public Set<StreamerIndexEntry<E, K, V>> entrySet(boolean asc, @Nullable V fromVal, boolean fromIncl,
+        @Nullable V toVal, boolean toIncl);
+
+    /**
+     * Gets read-only set of index keys with given value. Iteration order over
+     * this set has the same order as within index.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @param val Value.
+     * @return Read-only set of index entries with given value.
+     */
+    public Set<K> keySet(V val);
+
+    /**
+     * Gets read-only set of index keys within given value range. Iteration order over
+     * this set has the same order as within index.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @param asc {@code True} for ascending set.
+     * @param fromVal From value, if {@code null}, then start from beginning.
+     * @param toVal To value, if {@code null} then include all entries until the end.
+     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+     * @return Read-only set of index entries within given value range.
+     */
+    public Set<K> keySet(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
+
+    /**
+     * Gets read-only collection of index values within given value range. Iteration order over
+     * this collection has the same order as within index.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @param asc {@code True} for ascending set.
+     * @param fromVal From value, if {@code null}, then start from beginning.
+     * @param toVal To value, if {@code null} then include all entries until the end.
+     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+     * @return Read-only set of index entries within given value range.
+     */
+    public Collection<V> values(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
+
+    /**
+     * Gets read-only collection of index events.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @param val From value, if {@code null}, then start from beginning.
+     * @return Read-only set of index entries with given value.
+     */
+    public Collection<E> events(@Nullable V val);
+
+    /**
+     * Gets read-only collection of index events.
+     * <p>
+     * Events are guaranteed to be sorted by corresponding values, however
+     * the order of the events corresponding to the same value is not defined.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @param asc {@code True} for ascending set.
+     * @param fromVal From value, if {@code null}, then start from beginning.
+     * @param toVal To value, if {@code null} then include all entries until the end.
+     * @param fromIncl Whether or not left value is inclusive (ignored if {@code minVal} is {@code null}).
+     * @param toIncl Whether or not right value is inclusive (ignored if {@code maxVal} is {@code null}).
+     * @return Read-only set of index entries within given value range.
+     */
+    public Collection<E> events(boolean asc, @Nullable V fromVal, boolean fromIncl, @Nullable V toVal, boolean toIncl);
+
+    /**
+     * Gets first entry in the index.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @return First entry in the index, or {@code null} if index is empty.
+     */
+    @Nullable public StreamerIndexEntry<E, K, V> firstEntry();
+
+    /**
+     * Gets last entry in the index.
+     * <p>
+     * This operation is only available for sorted indexes.
+     *
+     * @return Last entry in the index, or {@code null} if index is empty.
+     */
+    @Nullable public StreamerIndexEntry<E, K, V> lastEntry();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java
new file mode 100644
index 0000000..4bbf056
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexEntry.java
@@ -0,0 +1,49 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Streamer index entry. Individual index entry contains index key, value, and all events
+ * associated with given key.
+ *
+ */
+public interface StreamerIndexEntry<E, K, V> {
+    /**
+     * Gets events associated with given index key and value.
+     * <p>
+     * Events are tracked only if {@link StreamerIndexProvider#getPolicy()}
+     * is set to {@link StreamerIndexPolicy#EVENT_TRACKING_ON} or
+     * {@link StreamerIndexPolicy#EVENT_TRACKING_ON_DEDUP}.
+     *
+     * @return Events associated with given index key and value or {@code null} if event tracking is off.
+     */
+    @Nullable public Collection<E> events();
+
+    /**
+     * Gets index entry key.
+     *
+     * @return Index entry key.
+     */
+    public K key();
+
+    /**
+     * Gets index entry value.
+     * <p>
+     * For sorted indexes, the sorting happens based on this value.
+     *
+     * @return Index entry value.
+     */
+    public V value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f54e7bac/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java
new file mode 100644
index 0000000..ce1bcb2
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexPolicy.java
@@ -0,0 +1,42 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.streamer.index;
+
+/**
+ * Streamer index policy, which defines how events
+ * are tracked within an index.
+ */
+public enum StreamerIndexPolicy {
+    /**
+     * Do not track events.
+     * <p>
+     * Only a value, generated by {@link StreamerIndexUpdater},
+     * will be stored in an index; event objects will be thrown away.
+     */
+    EVENT_TRACKING_OFF,
+
+    /**
+     * Track events.
+     * <p>
+     * All event objects will stored in an index along with the values,
+     * generated by {@link StreamerIndexUpdater}.
+     */
+    EVENT_TRACKING_ON,
+
+    /**
+     * Track events with de-duplication.
+     * <p>
+     * All event objects will stored in an index along with the values,
+     * generated by {@link StreamerIndexUpdater}. For duplicate (equal)
+     * events, only a single event object will be stored, which corresponds
+     * to a first event.
+     */
+    EVENT_TRACKING_ON_DEDUP
+}