You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/03 19:39:58 UTC

[3/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl

# ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b33b651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b33b651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b33b651

Branch: refs/heads/ignite-394
Commit: 9b33b6510f5b82c30c8e75a66eb328b00bc425e4
Parents: 6909cc4
Author: Artem Shutak <as...@gridgain.com>
Authored: Tue Mar 3 21:40:19 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Tue Mar 3 21:40:19 2015 +0300

----------------------------------------------------------------------
 .../datagrid/CacheDataLoaderExample.java        |    6 +-
 .../datagrid/CachePopularNumbersExample.java    |    4 +-
 .../src/main/java/org/apache/ignite/Ignite.java |    6 +-
 .../org/apache/ignite/IgniteDataLoader.java     |  379 -----
 .../org/apache/ignite/IgniteDataStreamer.java   |  379 +++++
 .../apache/ignite/internal/IgniteKernal.java    |    2 +-
 .../processors/cache/GridCacheAdapter.java      |   10 +-
 .../GridDistributedCacheAdapter.java            |    4 +-
 .../dataload/GridDataLoadCacheUpdaters.java     |   18 +-
 .../dataload/GridDataLoadUpdateJob.java         |    4 +-
 .../dataload/GridDataLoaderFuture.java          |    4 +-
 .../dataload/GridDataLoaderProcessor.java       |   16 +-
 .../dataload/IgniteDataLoaderImpl.java          | 1453 ------------------
 .../dataload/IgniteDataStreamerImpl.java        | 1453 ++++++++++++++++++
 .../dr/GridDrDataLoadCacheUpdater.java          |    2 +-
 .../processors/igfs/IgfsDataManager.java        |   10 +-
 ...iteTxConsistencyRestartAbstractSelfTest.java |    2 +-
 ...idCachePartitionedHitsAndMissesSelfTest.java |    4 +-
 .../GridCacheLruNearEvictionPolicySelfTest.java |    2 +-
 ...heNearOnlyLruNearEvictionPolicySelfTest.java |    2 +-
 .../dataload/GridDataLoaderImplSelfTest.java    |    5 +-
 .../dataload/GridDataLoaderPerformanceTest.java |    2 +-
 .../GridDataLoaderProcessorSelfTest.java        |   27 +-
 .../loadtests/colocation/GridTestMain.java      |    2 +-
 .../loadtests/discovery/GridGcTimeoutTest.java  |    2 +-
 .../mapper/GridContinuousMapperLoadTest1.java   |    2 +-
 .../mapper/GridContinuousMapperLoadTest2.java   |    2 +-
 .../ignite/testframework/junits/IgniteMock.java |    4 +-
 .../scala/org/apache/ignite/scalar/scalar.scala |    8 +-
 .../org/apache/ignite/IgniteSpringBean.java     |    2 +-
 .../cache/IgniteSqlQueryBenchmark.java          |    2 +-
 .../cache/IgniteSqlQueryJoinBenchmark.java      |    2 +-
 32 files changed, 1909 insertions(+), 1911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
index 57b0cd2..4cdbfd4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
@@ -21,8 +21,8 @@ import org.apache.ignite.*;
 import org.apache.ignite.examples.*;
 
 /**
- * Demonstrates how cache can be populated with data utilizing {@link IgniteDataLoader} API.
- * {@link IgniteDataLoader} is a lot more efficient to use than standard
+ * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API.
+ * {@link IgniteDataStreamer} is a lot more efficient to use than standard
  * {@code put(...)} operation as it properly buffers cache requests
  * together and properly manages load on remote nodes.
  * <p>
@@ -63,7 +63,7 @@ public class CacheDataLoaderExample {
 
             long start = System.currentTimeMillis();
 
-            try (IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
+            try (IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
                 // Configure loader.
                 ldr.perNodeBufferSize(1024);
                 ldr.perNodeParallelLoadOperations(8);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
index 0f71681..1fc737b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
@@ -92,7 +92,7 @@ public class CachePopularNumbersExample {
      * @throws IgniteException If failed.
      */
     private static void streamData(final Ignite ignite) throws IgniteException {
-        try (IgniteDataLoader<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
+        try (IgniteDataStreamer<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
             // Set larger per-node buffer size since our state is relatively small.
             ldr.perNodeBufferSize(2048);
 
@@ -140,7 +140,7 @@ public class CachePopularNumbersExample {
     /**
      * Increments value for key.
      */
-    private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> {
+    private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> {
         /** */
         private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() {
             @Override public Void process(MutableEntry<Integer, Long> e, Object... args) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 8851d8f..44d4ba9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -42,7 +42,7 @@ import java.util.concurrent.*;
  * In addition to {@link ClusterGroup} functionality, from here you can get the following:
  * <ul>
  * <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed cache.</li>
- * <li>{@link IgniteDataLoader} - functionality for loading data large amounts of data into cache.</li>
+ * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li>
  * <li>{@link IgniteFs} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li>
  * <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li>
  * <li>{@link IgniteScheduler} - functionality for scheduling jobs using UNIX Cron syntax.</li>
@@ -205,12 +205,12 @@ public interface Ignite extends AutoCloseable {
     /**
      * Gets a new instance of data loader associated with given cache name. Data loader
      * is responsible for loading external data into in-memory data grid. For more information
-     * refer to {@link IgniteDataLoader} documentation.
+     * refer to {@link IgniteDataStreamer} documentation.
      *
      * @param cacheName Cache name ({@code null} for default cache).
      * @return Data loader.
      */
-    public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName);
+    public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName);
 
     /**
      * Gets an instance of IGFS - Ignite In-Memory File System, if one is not

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
deleted file mode 100644
index 3cff287..0000000
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite;
-
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Data loader is responsible for loading external data into cache. It achieves it by
- * properly buffering updates and properly mapping keys to nodes responsible for the data
- * to make sure that there is the least amount of data movement possible and optimal
- * network and memory utilization.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- * <p>
- * Also note that {@code GridDataLoader} is not the only way to load data into cache.
- * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
- * method to load data from underlying data store. You can also use standard
- * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
- * likely will not perform as well as this class for loading data. And finally,
- * data can be loaded from underlying data store on demand, whenever it is accessed -
- * for this no explicit data loading step is needed.
- * <p>
- * {@code IgniteDataLoader} supports the following configuration properties:
- * <ul>
- *  <li>
- *      {@link #perNodeBufferSize(int)} - when entries are added to data loader via
- *      {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
- *      away and are buffered internally for better performance and network utilization.
- *      This setting controls the size of internal per-node buffer before buffered data
- *      is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
- *      value.
- *  </li>
- *  <li>
- *      {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
- *      to the data loader via {@link #addData(Object, Object)} method faster than it can
- *      be put in cache. In this case, new buffered load messages are sent to remote nodes
- *      before responses from previous ones are received. This could cause unlimited heap
- *      memory utilization growth on local and remote nodes. To control memory utilization,
- *      this setting limits maximum allowed number of parallel buffered load messages that
- *      are being processed on remote nodes. If this number is exceeded, then
- *      {@link #addData(Object, Object)} method will block to control memory utilization.
- *      Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
- *  </li>
- *  <li>
- *      {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
- *      this is the time after which the loader will make an attempt to submit all data
- *      added so far to remote nodes. Note that there is no guarantee that data will be
- *      delivered after this concrete attempt (e.g., it can fail when topology is
- *      changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
- *  </li>
- *  <li>
- *      {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
- *      updates and allow data loader choose most optimal concurrent implementation.
- *  </li>
- *  <li>
- *      {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries.
- *      It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
- *  </li>
- *  <li>
- *      {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
- *      loaded by a data loader must be class-loadable from the same class-loader.
- *      Ignite will make the best effort to detect the most suitable class-loader
- *      for data loading. However, in complex cases, where compound or deeply nested
- *      class-loaders are used, it is best to specify a deploy class which can be any
- *      class loaded by the class-loader for given data.
- *  </li>
- * </ul>
- */
-public interface IgniteDataLoader<K, V> extends AutoCloseable {
-    /** Default max concurrent put operations count. */
-    public static final int DFLT_MAX_PARALLEL_OPS = 16;
-
-    /** Default per node buffer size. */
-    public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
-
-    /**
-     * Name of cache to load data to.
-     *
-     * @return Cache name or {@code null} for default cache.
-     */
-    public String cacheName();
-
-    /**
-     * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
-     * Default is {@code true}.
-     *
-     * @return Flag value.
-     */
-    public boolean allowOverwrite();
-
-    /**
-     * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
-     * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method.
-     * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
-     *
-     * @param allowOverwrite Flag value.
-     * @throws IgniteException If failed.
-     */
-    public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
-
-    /**
-     * Gets flag indicating that write-through behavior should be disabled for data loading.
-     * Default is {@code false}.
-     *
-     * @return Skip store flag.
-     */
-    public boolean skipStore();
-
-    /**
-     * Sets flag indicating that write-through behavior should be disabled for data loading.
-     * Default is {@code false}.
-     *
-     * @param skipStore Skip store flag.
-     */
-    public void skipStore(boolean skipStore);
-
-    /**
-     * Gets size of per node key-value pairs buffer.
-     *
-     * @return Per node buffer size.
-     */
-    public int perNodeBufferSize();
-
-    /**
-     * Sets size of per node key-value pairs buffer.
-     * <p>
-     * This method should be called prior to {@link #addData(Object, Object)} call.
-     * <p>
-     * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
-     *
-     * @param bufSize Per node buffer size.
-     */
-    public void perNodeBufferSize(int bufSize);
-
-    /**
-     * Gets maximum number of parallel load operations for a single node.
-     *
-     * @return Maximum number of parallel load operations for a single node.
-     */
-    public int perNodeParallelLoadOperations();
-
-    /**
-     * Sets maximum number of parallel load operations for a single node.
-     * <p>
-     * This method should be called prior to {@link #addData(Object, Object)} call.
-     * <p>
-     * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
-     *
-     * @param parallelOps Maximum number of parallel load operations for a single node.
-     */
-    public void perNodeParallelLoadOperations(int parallelOps);
-
-    /**
-     * Gets automatic flush frequency. Essentially, this is the time after which the
-     * loader will make an attempt to submit all data added so far to remote nodes.
-     * Note that there is no guarantee that data will be delivered after this concrete
-     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
-     * <p>
-     * If set to {@code 0}, automatic flush is disabled.
-     * <p>
-     * Automatic flush is disabled by default (default value is {@code 0}).
-     *
-     * @return Flush frequency or {@code 0} if automatic flush is disabled.
-     * @see #flush()
-     */
-    public long autoFlushFrequency();
-
-    /**
-     * Sets automatic flush frequency. Essentially, this is the time after which the
-     * loader will make an attempt to submit all data added so far to remote nodes.
-     * Note that there is no guarantee that data will be delivered after this concrete
-     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
-     * <p>
-     * If set to {@code 0}, automatic flush is disabled.
-     * <p>
-     * Automatic flush is disabled by default (default value is {@code 0}).
-     *
-     * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
-     * @see #flush()
-     */
-    public void autoFlushFrequency(long autoFlushFreq);
-
-    /**
-     * Gets future for this loading process. This future completes whenever method
-     * {@link #close(boolean)} completes. By attaching listeners to this future
-     * it is possible to get asynchronous notifications for completion of this
-     * loading process.
-     *
-     * @return Future for this loading process.
-     */
-    public IgniteFuture<?> future();
-
-    /**
-     * Optional deploy class for peer deployment. All classes loaded by a data loader
-     * must be class-loadable from the same class-loader. Ignite will make the best
-     * effort to detect the most suitable class-loader for data loading. However,
-     * in complex cases, where compound or deeply nested class-loaders are used,
-     * it is best to specify a deploy class which can be any class loaded by
-     * the class-loader for given data.
-     *
-     * @param depCls Any class loaded by the class-loader for given data.
-     */
-    public void deployClass(Class<?> depCls);
-
-    /**
-     * Sets custom cache updater to this data loader.
-     *
-     * @param updater Cache updater.
-     */
-    public void updater(Updater<K, V> updater);
-
-    /**
-     * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
-     *
-     * @param key Key.
-     * @return Future fo this operation.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     */
-    public IgniteFuture<?> removeData(K key)  throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param key Key.
-     * @param val Value or {@code null} if respective entry must be removed from cache.
-     * @return Future fo this operation.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     */
-    public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
-        IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param entry Entry.
-     * @return Future fo this operation.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     */
-    public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
-        IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param entries Collection of entries to be loaded.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @return Future for this load operation.
-     */
-    public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param entries Map to be loaded.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @return Future for this load operation.
-     */
-    public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
-
-    /**
-     * Loads any remaining data, but doesn't close the loader. Data can be still added after
-     * flush is finished. This method blocks and doesn't allow to add any data until all data
-     * is loaded.
-     * <p>
-     * If another thread is already performing flush, this method will block, wait for
-     * another thread to complete flush and exit. If you don't want to wait in this case,
-     * use {@link #tryFlush()} method.
-     *
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @see #tryFlush()
-     */
-    public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
-    /**
-     * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
-     * with the difference that it won't wait and will exit immediately.
-     *
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @see #flush()
-     */
-    public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
-    /**
-     * Loads any remaining data and closes this loader.
-     *
-     * @param cancel {@code True} to cancel ongoing loading operations.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     */
-    public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
-
-    /**
-     * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
-     * <p>
-     * The method is invoked automatically on objects managed by the
-     * {@code try-with-resources} statement.
-     *
-     * @throws IgniteException If failed to close data loader.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     */
-    @Override public void close() throws IgniteException, IgniteInterruptedException;
-
-    /**
-     * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)}
-     * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
-     * performance custom user-defined implementation may help.
-     * <p>
-     * Data loader can be configured to use custom implementation of updater instead of default one using
-     * {@link IgniteDataLoader#updater(IgniteDataLoader.Updater)} method.
-     */
-    interface Updater<K, V> extends Serializable {
-        /**
-         * Updates cache with batch of entries.
-         *
-         * @param cache Cache.
-         * @param entries Collection of entries.
-         * @throws IgniteException If failed.
-         */
-        public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
new file mode 100644
index 0000000..c48d61a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Data loader is responsible for loading external data into cache. It achieves it by
+ * properly buffering updates and properly mapping keys to nodes responsible for the data
+ * to make sure that there is the least amount of data movement possible and optimal
+ * network and memory utilization.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ * <p>
+ * Also note that {@code GridDataLoader} is not the only way to load data into cache.
+ * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
+ * method to load data from underlying data store. You can also use standard
+ * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
+ * likely will not perform as well as this class for loading data. And finally,
+ * data can be loaded from underlying data store on demand, whenever it is accessed -
+ * for this no explicit data loading step is needed.
+ * <p>
+ * {@code IgniteDataLoader} supports the following configuration properties:
+ * <ul>
+ *  <li>
+ *      {@link #perNodeBufferSize(int)} - when entries are added to data loader via
+ *      {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
+ *      away and are buffered internally for better performance and network utilization.
+ *      This setting controls the size of internal per-node buffer before buffered data
+ *      is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
+ *      value.
+ *  </li>
+ *  <li>
+ *      {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
+ *      to the data loader via {@link #addData(Object, Object)} method faster than it can
+ *      be put in cache. In this case, new buffered load messages are sent to remote nodes
+ *      before responses from previous ones are received. This could cause unlimited heap
+ *      memory utilization growth on local and remote nodes. To control memory utilization,
+ *      this setting limits maximum allowed number of parallel buffered load messages that
+ *      are being processed on remote nodes. If this number is exceeded, then
+ *      {@link #addData(Object, Object)} method will block to control memory utilization.
+ *      Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ *  </li>
+ *  <li>
+ *      {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
+ *      this is the time after which the loader will make an attempt to submit all data
+ *      added so far to remote nodes. Note that there is no guarantee that data will be
+ *      delivered after this concrete attempt (e.g., it can fail when topology is
+ *      changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
+ *  </li>
+ *  <li>
+ *      {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
+ *      updates and allow data loader choose most optimal concurrent implementation.
+ *  </li>
+ *  <li>
+ *      {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with loaded entries.
+ *      It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
+ *  </li>
+ *  <li>
+ *      {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
+ *      loaded by a data loader must be class-loadable from the same class-loader.
+ *      Ignite will make the best effort to detect the most suitable class-loader
+ *      for data loading. However, in complex cases, where compound or deeply nested
+ *      class-loaders are used, it is best to specify a deploy class which can be any
+ *      class loaded by the class-loader for given data.
+ *  </li>
+ * </ul>
+ */
+public interface IgniteDataStreamer<K, V> extends AutoCloseable {
+    /** Default max concurrent put operations count. */
+    public static final int DFLT_MAX_PARALLEL_OPS = 16;
+
+    /** Default per node buffer size. */
+    public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
+
+    /**
+     * Name of cache to load data to.
+     *
+     * @return Cache name or {@code null} for default cache.
+     */
+    public String cacheName();
+
+    /**
+     * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
+     * Default is {@code true}.
+     *
+     * @return Flag value.
+     */
+    public boolean allowOverwrite();
+
+    /**
+     * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
+     * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method.
+     * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
+     *
+     * @param allowOverwrite Flag value.
+     * @throws IgniteException If failed.
+     */
+    public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
+
+    /**
+     * Gets flag indicating that write-through behavior should be disabled for data loading.
+     * Default is {@code false}.
+     *
+     * @return Skip store flag.
+     */
+    public boolean skipStore();
+
+    /**
+     * Sets flag indicating that write-through behavior should be disabled for data loading.
+     * Default is {@code false}.
+     *
+     * @param skipStore Skip store flag.
+     */
+    public void skipStore(boolean skipStore);
+
+    /**
+     * Gets size of per node key-value pairs buffer.
+     *
+     * @return Per node buffer size.
+     */
+    public int perNodeBufferSize();
+
+    /**
+     * Sets size of per node key-value pairs buffer.
+     * <p>
+     * This method should be called prior to {@link #addData(Object, Object)} call.
+     * <p>
+     * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
+     *
+     * @param bufSize Per node buffer size.
+     */
+    public void perNodeBufferSize(int bufSize);
+
+    /**
+     * Gets maximum number of parallel load operations for a single node.
+     *
+     * @return Maximum number of parallel load operations for a single node.
+     */
+    public int perNodeParallelLoadOperations();
+
+    /**
+     * Sets maximum number of parallel load operations for a single node.
+     * <p>
+     * This method should be called prior to {@link #addData(Object, Object)} call.
+     * <p>
+     * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+     *
+     * @param parallelOps Maximum number of parallel load operations for a single node.
+     */
+    public void perNodeParallelLoadOperations(int parallelOps);
+
+    /**
+     * Gets automatic flush frequency. Essentially, this is the time after which the
+     * loader will make an attempt to submit all data added so far to remote nodes.
+     * Note that there is no guarantee that data will be delivered after this concrete
+     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+     * <p>
+     * If set to {@code 0}, automatic flush is disabled.
+     * <p>
+     * Automatic flush is disabled by default (default value is {@code 0}).
+     *
+     * @return Flush frequency or {@code 0} if automatic flush is disabled.
+     * @see #flush()
+     */
+    public long autoFlushFrequency();
+
+    /**
+     * Sets automatic flush frequency. Essentially, this is the time after which the
+     * loader will make an attempt to submit all data added so far to remote nodes.
+     * Note that there is no guarantee that data will be delivered after this concrete
+     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+     * <p>
+     * If set to {@code 0}, automatic flush is disabled.
+     * <p>
+     * Automatic flush is disabled by default (default value is {@code 0}).
+     *
+     * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
+     * @see #flush()
+     */
+    public void autoFlushFrequency(long autoFlushFreq);
+
+    /**
+     * Gets future for this loading process. This future completes whenever method
+     * {@link #close(boolean)} completes. By attaching listeners to this future
+     * it is possible to get asynchronous notifications for completion of this
+     * loading process.
+     *
+     * @return Future for this loading process.
+     */
+    public IgniteFuture<?> future();
+
+    /**
+     * Optional deploy class for peer deployment. All classes loaded by a data loader
+     * must be class-loadable from the same class-loader. Ignite will make the best
+     * effort to detect the most suitable class-loader for data loading. However,
+     * in complex cases, where compound or deeply nested class-loaders are used,
+     * it is best to specify a deploy class which can be any class loaded by
+     * the class-loader for given data.
+     *
+     * @param depCls Any class loaded by the class-loader for given data.
+     */
+    public void deployClass(Class<?> depCls);
+
+    /**
+     * Sets custom cache updater to this data loader.
+     *
+     * @param updater Cache updater.
+     */
+    public void updater(Updater<K, V> updater);
+
+    /**
+     * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
+     *
+     * @param key Key.
+     * @return Future fo this operation.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     */
+    public IgniteFuture<?> removeData(K key)  throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param key Key.
+     * @param val Value or {@code null} if respective entry must be removed from cache.
+     * @return Future fo this operation.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     */
+    public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
+        IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param entry Entry.
+     * @return Future fo this operation.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     */
+    public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
+        IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param entries Collection of entries to be loaded.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @return Future for this load operation.
+     */
+    public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param entries Map to be loaded.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @return Future for this load operation.
+     */
+    public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
+
+    /**
+     * Loads any remaining data, but doesn't close the loader. Data can be still added after
+     * flush is finished. This method blocks and doesn't allow to add any data until all data
+     * is loaded.
+     * <p>
+     * If another thread is already performing flush, this method will block, wait for
+     * another thread to complete flush and exit. If you don't want to wait in this case,
+     * use {@link #tryFlush()} method.
+     *
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @see #tryFlush()
+     */
+    public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+    /**
+     * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
+     * with the difference that it won't wait and will exit immediately.
+     *
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @see #flush()
+     */
+    public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+    /**
+     * Loads any remaining data and closes this loader.
+     *
+     * @param cancel {@code True} to cancel ongoing loading operations.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     */
+    public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
+
+    /**
+     * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
+     * <p>
+     * The method is invoked automatically on objects managed by the
+     * {@code try-with-resources} statement.
+     *
+     * @throws IgniteException If failed to close data loader.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     */
+    @Override public void close() throws IgniteException, IgniteInterruptedException;
+
+    /**
+     * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)}
+     * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
+     * performance custom user-defined implementation may help.
+     * <p>
+     * Data loader can be configured to use custom implementation of updater instead of default one using
+     * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method.
+     */
+    interface Updater<K, V> extends Serializable {
+        /**
+         * Updates cache with batch of entries.
+         *
+         * @param cache Cache.
+         * @param entries Collection of entries.
+         * @throws IgniteException If failed.
+         */
+        public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f46d071..336f872 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2346,7 +2346,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+    @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
         guard();
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 12ea535..6ed5699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3877,7 +3877,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+            IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
 
             try {
                 ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -4043,7 +4043,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @throws IgniteCheckedException If failed.
      */
     private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
-        try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
+        try (final IgniteDataStreamer<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
             ldr.allowOverwrite(true);
             ldr.skipStore(true);
 
@@ -4086,7 +4086,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+            IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
 
             try {
                 ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -6134,7 +6134,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final Collection<Map.Entry<K, V>> col;
 
         /** */
-        final IgniteDataLoaderImpl<K, V> ldr;
+        final IgniteDataStreamerImpl<K, V> ldr;
 
         /** */
         final ExpiryPolicy plc;
@@ -6145,7 +6145,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
          * @param plc Optional expiry policy.
          */
         private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
-            IgniteDataLoaderImpl<K, V> ldr,
+            IgniteDataStreamerImpl<K, V> ldr,
             @Nullable ExpiryPolicy plc) {
             this.p = p;
             this.ldr = ldr;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 00190d9..c99efc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -276,8 +276,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 else
                     dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
 
-                try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) {
-                    ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0);
+                try (IgniteDataStreamer<K, V> dataLdr = ignite.dataLoader(cacheName)) {
+                    ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
 
                     dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
index e2e780b..78a7e62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
@@ -29,13 +29,13 @@ import java.util.*;
  */
 public class GridDataLoadCacheUpdaters {
     /** */
-    private static final IgniteDataLoader.Updater INDIVIDUAL = new Individual();
+    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
 
     /** */
-    private static final IgniteDataLoader.Updater BATCHED = new Batched();
+    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
 
     /** */
-    private static final IgniteDataLoader.Updater BATCHED_SORTED = new BatchedSorted();
+    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
 
     /**
      * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
@@ -44,7 +44,7 @@ public class GridDataLoadCacheUpdaters {
      *
      * @return Single updater.
      */
-    public static <K, V> IgniteDataLoader.Updater<K, V> individual() {
+    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
         return INDIVIDUAL;
     }
 
@@ -55,7 +55,7 @@ public class GridDataLoadCacheUpdaters {
      *
      * @return Batched updater.
      */
-    public static <K, V> IgniteDataLoader.Updater<K, V> batched() {
+    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
         return BATCHED;
     }
 
@@ -66,7 +66,7 @@ public class GridDataLoadCacheUpdaters {
      *
      * @return Batched sorted updater.
      */
-    public static <K extends Comparable<?>, V> IgniteDataLoader.Updater<K, V> batchedSorted() {
+    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
         return BATCHED_SORTED;
     }
 
@@ -93,7 +93,7 @@ public class GridDataLoadCacheUpdaters {
     /**
      * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
      */
-    private static class Individual<K, V> implements IgniteDataLoader.Updater<K, V> {
+    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -120,7 +120,7 @@ public class GridDataLoadCacheUpdaters {
     /**
      * Batched updater. Updates cache using batch operations thus is dead lock prone.
      */
-    private static class Batched<K, V> implements IgniteDataLoader.Updater<K, V> {
+    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -160,7 +160,7 @@ public class GridDataLoadCacheUpdaters {
     /**
      * Batched updater. Updates cache using batch operations thus is dead lock prone.
      */
-    private static class BatchedSorted<K, V> implements IgniteDataLoader.Updater<K, V> {
+    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
index 9e2a483..8aa554a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
@@ -48,7 +48,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
     private final boolean skipStore;
 
     /** */
-    private final IgniteDataLoader.Updater<K, V> updater;
+    private final IgniteDataStreamer.Updater<K, V> updater;
 
     /**
      * @param ctx Context.
@@ -65,7 +65,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
         Collection<Map.Entry<K, V>> col,
         boolean ignoreDepOwnership,
         boolean skipStore,
-        IgniteDataLoader.Updater<K, V> updater) {
+        IgniteDataStreamer.Updater<K, V> updater) {
         this.ctx = ctx;
         this.log = log;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
index 5efcfe9..dffa862 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
@@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
 
     /** Data loader. */
     @GridToStringExclude
-    private IgniteDataLoaderImpl dataLdr;
+    private IgniteDataStreamerImpl dataLdr;
 
     /**
      * Default constructor for {@link Externalizable} support.
@@ -47,7 +47,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
      * @param ctx Context.
      * @param dataLdr Data loader.
      */
-    GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
+    GridDataLoaderFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
         super(ctx);
 
         assert dataLdr != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
index d470d02..b29c9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
  */
 public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
     /** Loaders map (access is not supposed to be highly concurrent). */
-    private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>();
+    private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -50,7 +50,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
     private Thread flusher;
 
     /** */
-    private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>();
+    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
 
     /** Marshaller. */
     private final Marshaller marsh;
@@ -80,7 +80,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
         flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
             @Override protected void body() throws InterruptedException {
                 while (!isCancelled()) {
-                    IgniteDataLoaderImpl<K, V> ldr = flushQ.take();
+                    IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
 
                     if (!busyLock.enterBusy())
                         return;
@@ -118,7 +118,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
         U.interrupt(flusher);
         U.join(flusher, log);
 
-        for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) {
+        for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
             if (log.isDebugEnabled())
                 log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
 
@@ -142,12 +142,12 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
      * @param compact {@code true} if data loader should transfer data in compact format.
      * @return Data loader.
      */
-    public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
+    public IgniteDataStreamerImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to create data loader (grid is stopping).");
 
         try {
-            final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact);
+            final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
 
             ldrs.add(ldr);
 
@@ -173,7 +173,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
      * @param cacheName Cache name ({@code null} for default cache).
      * @return Data loader.
      */
-    public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+    public IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
         return dataLoader(cacheName, true);
     }
 
@@ -234,7 +234,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
             }
 
             Collection<Map.Entry<K, V>> col;
-            IgniteDataLoader.Updater<K, V> updater;
+            IgniteDataStreamer.Updater<K, V> updater;
 
             try {
                 col = marsh.unmarshal(req.collectionBytes(), clsLdr);