You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/03 19:39:56 UTC

[1/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-394 [created] 9b33b6510


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
new file mode 100644
index 0000000..3f94752
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
@@ -0,0 +1,1453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.internal.processors.portable.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.Map.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Data loader implementation.
+ */
+@SuppressWarnings("unchecked")
+public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+    /** Isolated updater. */
+    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
+    /** Cache updater. */
+    private Updater<K, V> updater = ISOLATED_UPDATER;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Max remap count before issuing an error. */
+    private static final int DFLT_MAX_REMAP_CNT = 32;
+
+    /** Log reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Cache name ({@code null} for default cache). */
+    private final String cacheName;
+
+    /** Portable enabled flag. */
+    private final boolean portableEnabled;
+
+    /**
+     *  If {@code true} then data will be transferred in compact format (only keys and values).
+     *  Otherwise full map entry will be transferred (this is requires by DR internal logic).
+     */
+    private final boolean compact;
+
+    /** Per-node buffer size. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+    /** */
+    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+    /** */
+    private long autoFlushFreq;
+
+    /** Mapping. */
+    @GridToStringInclude
+    private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Discovery listener. */
+    private final GridLocalEventListener discoLsnr;
+
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /** Communication topic for responses. */
+    private final Object topic;
+
+    /** */
+    private byte[] topicBytes;
+
+    /** {@code True} if data loader has been cancelled. */
+    private volatile boolean cancelled;
+
+    /** Active futures of this data loader. */
+    @GridToStringInclude
+    private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
+
+    /** Closure to remove from active futures. */
+    @GridToStringExclude
+    private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
+        @Override public void apply(IgniteInternalFuture<?> t) {
+            boolean rmv = activeFuts.remove(t);
+
+            assert rmv;
+        }
+    };
+
+    /** Job peer deploy aware. */
+    private volatile GridPeerDeployAware jobPda;
+
+    /** Deployment class. */
+    private Class<?> depCls;
+
+    /** Future to track loading finish. */
+    private final GridFutureAdapter<?> fut;
+
+    /** Public API future to track loading finish. */
+    private final IgniteFuture<?> publicFut;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Closed flag. */
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /** */
+    private volatile long lastFlushTime = U.currentTimeMillis();
+
+    /** */
+    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+
+    /**
+     * @param ctx Grid kernal context.
+     * @param cacheName Cache name.
+     * @param flushQ Flush queue.
+     * @param compact If {@code true} data is transferred in compact mode (only keys and values).
+     *                Otherwise full map entry will be transferred (this is required by DR internal logic).
+     */
+    public IgniteDataStreamerImpl(
+        final GridKernalContext ctx,
+        @Nullable final String cacheName,
+        DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
+        boolean compact
+    ) {
+        assert ctx != null;
+
+        this.ctx = ctx;
+        this.cacheName = cacheName;
+        this.flushQ = flushQ;
+        this.compact = compact;
+
+        log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
+
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+        if (node == null)
+            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+
+        portableEnabled = ctx.portable().portableEnabled(node, cacheName);
+
+        discoLsnr = new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                UUID id = discoEvt.eventNode().id();
+
+                // Remap regular mappings.
+                final Buffer buf = bufMappings.remove(id);
+
+                if (buf != null) {
+                    // Only async notification is possible since
+                    // discovery thread may be trapped otherwise.
+                    ctx.closure().callLocalSafe(
+                        new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                buf.onNodeLeft();
+
+                                return null;
+                            }
+                        },
+                        true /* system pool */
+                    );
+                }
+            }
+        };
+
+        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        // Generate unique topic for this loader.
+        topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+        ctx.io().addMessageListener(topic, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof GridDataLoadResponse;
+
+                GridDataLoadResponse res = (GridDataLoadResponse)msg;
+
+                if (log.isDebugEnabled())
+                    log.debug("Received data load response: " + res);
+
+                Buffer buf = bufMappings.get(nodeId);
+
+                if (buf != null)
+                    buf.onResponse(res);
+
+                else if (log.isDebugEnabled())
+                    log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
+            }
+        });
+
+        if (log.isDebugEnabled())
+            log.debug("Added response listener within topic: " + topic);
+
+        fut = new GridDataLoaderFuture(ctx, this);
+
+        publicFut = new IgniteFutureImpl<>(fut);
+    }
+
+    /**
+     * Enters busy lock.
+     */
+    private void enterBusy() {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Data loader has been closed.");
+    }
+
+    /**
+     * Leaves busy lock.
+     */
+    private void leaveBusy() {
+        busyLock.leaveBusy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> future() {
+        return publicFut;
+    }
+
+    /**
+     * @return Internal future.
+     */
+    public IgniteInternalFuture<?> internalFuture() {
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deployClass(Class<?> depCls) {
+        this.depCls = depCls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updater(Updater<K, V> updater) {
+        A.notNull(updater, "updater");
+
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowOverwrite() {
+        return updater != ISOLATED_UPDATER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void allowOverwrite(boolean allow) {
+        if (allow == allowOverwrite())
+            return;
+
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+        if (node == null)
+            throw new IgniteException("Failed to get node for cache: " + cacheName);
+
+        updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void skipStore(boolean skipStore) {
+        this.skipStore = skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public String cacheName() {
+        return cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeBufferSize() {
+        return bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeBufferSize(int bufSize) {
+        A.ensure(bufSize > 0, "bufSize > 0");
+
+        this.bufSize = bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeParallelLoadOperations() {
+        return parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeParallelLoadOperations(int parallelOps) {
+        this.parallelOps = parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long autoFlushFrequency() {
+        return autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void autoFlushFrequency(long autoFlushFreq) {
+        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+        long old = this.autoFlushFreq;
+
+        if (autoFlushFreq != old) {
+            this.autoFlushFreq = autoFlushFreq;
+
+            if (autoFlushFreq != 0 && old == 0)
+                flushQ.add(this);
+            else if (autoFlushFreq == 0)
+                flushQ.remove(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+        A.notNull(entries, "entries");
+
+        return addData(entries.entrySet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
+        A.notEmpty(entries, "entries");
+
+        enterBusy();
+
+        try {
+            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
+
+            resFut.listenAsync(rmvActiveFut);
+
+            activeFuts.add(resFut);
+
+            Collection<K> keys = null;
+
+            if (entries.size() > 1) {
+                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+
+                for (Map.Entry<K, V> entry : entries)
+                    keys.add(entry.getKey());
+            }
+
+            load0(entries, resFut, keys, 0);
+
+            return new IgniteFutureImpl<>(resFut);
+        }
+        catch (IgniteException e) {
+            return new IgniteFinishedFutureImpl<>(ctx, e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+        A.notNull(entry, "entry");
+
+        return addData(F.asList(entry));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(K key, V val) {
+        A.notNull(key, "key");
+
+        return addData(new Entry0<>(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> removeData(K key) {
+        return addData(key, null);
+    }
+
+    /**
+     * @param entries Entries.
+     * @param resFut Result future.
+     * @param activeKeys Active keys.
+     * @param remaps Remaps count.
+     */
+    private void load0(
+        Collection<? extends Map.Entry<K, V>> entries,
+        final GridFutureAdapter<Object> resFut,
+        @Nullable final Collection<K> activeKeys,
+        final int remaps
+    ) {
+        assert entries != null;
+
+        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
+
+        boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+        for (Map.Entry<K, V> entry : entries) {
+            List<ClusterNode> nodes;
+
+            try {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                if (initPda) {
+                    jobPda = new DataLoaderPda(key, entry.getValue(), updater);
+
+                    initPda = false;
+                }
+
+                nodes = nodes(key);
+            }
+            catch (IgniteCheckedException e) {
+                resFut.onDone(e);
+
+                return;
+            }
+
+            if (F.isEmpty(nodes)) {
+                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+                    "(no nodes with cache found in topology) [infos=" + entries.size() +
+                    ", cacheName=" + cacheName + ']'));
+
+                return;
+            }
+
+            for (ClusterNode node : nodes) {
+                Collection<Map.Entry<K, V>> col = mappings.get(node);
+
+                if (col == null)
+                    mappings.put(node, col = new ArrayList<>());
+
+                col.add(entry);
+            }
+        }
+
+        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
+            final UUID nodeId = e.getKey().id();
+
+            Buffer buf = bufMappings.get(nodeId);
+
+            if (buf == null) {
+                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+                if (old != null)
+                    buf = old;
+            }
+
+            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
+
+            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> t) {
+                    try {
+                        t.get();
+
+                        if (activeKeys != null) {
+                            for (Map.Entry<K, V> e : entriesForNode)
+                                activeKeys.remove(e.getKey());
+
+                            if (activeKeys.isEmpty())
+                                resFut.onDone();
+                        }
+                        else {
+                            assert entriesForNode.size() == 1;
+
+                            // That has been a single key,
+                            // so complete result future right away.
+                            resFut.onDone();
+                        }
+                    }
+                    catch (IgniteCheckedException e1) {
+                        if (log.isDebugEnabled())
+                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+                        if (cancelled) {
+                            resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " +
+                                IgniteDataStreamerImpl.this, e1));
+                        }
+                        else if (remaps + 1 > maxRemapCnt) {
+                            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+                                + remaps), e1);
+                        }
+                        else
+                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
+                    }
+                }
+            };
+
+            GridFutureAdapter<?> f;
+
+            try {
+                f = buf.update(entriesForNode, lsnr);
+            }
+            catch (IgniteInterruptedCheckedException e1) {
+                resFut.onDone(e1);
+
+                return;
+            }
+
+            if (ctx.discovery().node(nodeId) == null) {
+                if (bufMappings.remove(nodeId, buf))
+                    buf.onNodeLeft();
+
+                if (f != null)
+                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                        "(node has left): " + nodeId));
+            }
+        }
+    }
+
+    /**
+     * @param key Key to map.
+     * @return Nodes to send requests to.
+     * @throws IgniteCheckedException If failed.
+     */
+    private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
+        GridAffinityProcessor aff = ctx.affinity();
+
+        return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
+            Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+    }
+
+    /**
+     * Performs flush.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doFlush() throws IgniteCheckedException {
+        lastFlushTime = U.currentTimeMillis();
+
+        List<IgniteInternalFuture> activeFuts0 = null;
+
+        int doneCnt = 0;
+
+        for (IgniteInternalFuture<?> f : activeFuts) {
+            if (!f.isDone()) {
+                if (activeFuts0 == null)
+                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
+
+                activeFuts0.add(f);
+            }
+            else {
+                f.get();
+
+                doneCnt++;
+            }
+        }
+
+        if (activeFuts0 == null || activeFuts0.isEmpty())
+            return;
+
+        while (true) {
+            Queue<IgniteInternalFuture<?>> q = null;
+
+            for (Buffer buf : bufMappings.values()) {
+                IgniteInternalFuture<?> flushFut = buf.flush();
+
+                if (flushFut != null) {
+                    if (q == null)
+                        q = new ArrayDeque<>(bufMappings.size() * 2);
+
+                    q.add(flushFut);
+                }
+            }
+
+            if (q != null) {
+                assert !q.isEmpty();
+
+                boolean err = false;
+
+                for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to flush buffer: " + e);
+
+                        err = true;
+                    }
+                }
+
+                if (err)
+                    // Remaps needed - flush buffers.
+                    continue;
+            }
+
+            doneCnt = 0;
+
+            for (int i = 0; i < activeFuts0.size(); i++) {
+                IgniteInternalFuture f = activeFuts0.get(i);
+
+                if (f == null)
+                    doneCnt++;
+                else if (f.isDone()) {
+                    f.get();
+
+                    doneCnt++;
+
+                    activeFuts0.set(i, null);
+                }
+                else
+                    break;
+            }
+
+            if (doneCnt == activeFuts0.size())
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void flush() throws IgniteException {
+        enterBusy();
+
+        try {
+            doFlush();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Flushes every internal buffer if buffer was flushed before passed in
+     * threshold.
+     * <p>
+     * Does not wait for result and does not fail on errors assuming that this method
+     * should be called periodically.
+     */
+    @Override public void tryFlush() throws IgniteInterruptedException {
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            for (Buffer buf : bufMappings.values())
+                buf.flush();
+
+            lastFlushTime = U.currentTimeMillis();
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @throws IgniteException If failed.
+     */
+    @Override public void close(boolean cancel) throws IgniteException {
+        try {
+            closeEx(cancel);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void closeEx(boolean cancel) throws IgniteCheckedException {
+        if (!closed.compareAndSet(false, true))
+            return;
+
+        busyLock.block();
+
+        if (log.isDebugEnabled())
+            log.debug("Closing data loader [ldr=" + this + ", cancel=" + cancel + ']');
+
+        IgniteCheckedException e = null;
+
+        try {
+            // Assuming that no methods are called on this loader after this method is called.
+            if (cancel) {
+                cancelled = true;
+
+                for (Buffer buf : bufMappings.values())
+                    buf.cancelAll();
+            }
+            else
+                doFlush();
+
+            ctx.event().removeLocalEventListener(discoLsnr);
+
+            ctx.io().removeMessageListener(topic);
+        }
+        catch (IgniteCheckedException e0) {
+            e = e0;
+        }
+
+        fut.onDone(null, e);
+
+        if (e != null)
+            throw e;
+    }
+
+    /**
+     * @return {@code true} If the loader is closed.
+     */
+    boolean isClosed() {
+        return fut.isDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        close(false);
+    }
+
+    /**
+     * @return Max remap count.
+     */
+    public int maxRemapCount() {
+        return maxRemapCnt;
+    }
+
+    /**
+     * @param maxRemapCnt New max remap count.
+     */
+    public void maxRemapCount(int maxRemapCnt) {
+        this.maxRemapCnt = maxRemapCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDataStreamerImpl.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDelay(TimeUnit unit) {
+        return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @return Next flush time.
+     */
+    private long nextFlushTime() {
+        return lastFlushTime + autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(Delayed o) {
+        return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
+    }
+
+    /**
+     *
+     */
+    private class Buffer {
+        /** Node. */
+        private final ClusterNode node;
+
+        /** Active futures. */
+        private final Collection<IgniteInternalFuture<Object>> locFuts;
+
+        /** Buffered entries. */
+        private List<Map.Entry<K, V>> entries;
+
+        /** */
+        @GridToStringExclude
+        private GridFutureAdapter<Object> curFut;
+
+        /** Local node flag. */
+        private final boolean isLocNode;
+
+        /** ID generator. */
+        private final AtomicLong idGen = new AtomicLong();
+
+        /** Active futures. */
+        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+        /** */
+        private final Semaphore sem;
+
+        /** Closure to signal on task finish. */
+        @GridToStringExclude
+        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
+            @Override public void apply(IgniteInternalFuture<Object> t) {
+                signalTaskFinished(t);
+            }
+        };
+
+        /**
+         * @param node Node.
+         */
+        Buffer(ClusterNode node) {
+            assert node != null;
+
+            this.node = node;
+
+            locFuts = new GridConcurrentHashSet<>();
+            reqs = new ConcurrentHashMap8<>();
+
+            // Cache local node flag.
+            isLocNode = node.equals(ctx.discovery().localNode());
+
+            entries = newEntries();
+            curFut = new GridFutureAdapter<>(ctx);
+            curFut.listenAsync(signalC);
+
+            sem = new Semaphore(parallelOps);
+        }
+
+        /**
+         * @param newEntries Infos.
+         * @param lsnr Listener for the operation future.
+         * @throws IgniteInterruptedCheckedException If failed.
+         * @return Future for operation.
+         */
+        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
+            IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+            List<Map.Entry<K, V>> entries0 = null;
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+
+                curFut0.listenAsync(lsnr);
+
+                for (Map.Entry<K, V> entry : newEntries)
+                    entries.add(entry);
+
+                if (entries.size() >= bufSize) {
+                    entries0 = entries;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>(ctx);
+                    curFut.listenAsync(signalC);
+                }
+            }
+
+            if (entries0 != null) {
+                submit(entries0, curFut0);
+
+                if (cancelled)
+                    curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this));
+            }
+
+            return curFut0;
+        }
+
+        /**
+         * @return Fresh collection with some space for outgrowth.
+         */
+        private List<Map.Entry<K, V>> newEntries() {
+            return new ArrayList<>((int)(bufSize * 1.2));
+        }
+
+        /**
+         * @return Future if any submitted.
+         *
+         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+         */
+        @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
+            List<Map.Entry<K, V>> entries0 = null;
+            GridFutureAdapter<Object> curFut0 = null;
+
+            synchronized (this) {
+                if (!entries.isEmpty()) {
+                    entries0 = entries;
+                    curFut0 = curFut;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>(ctx);
+                    curFut.listenAsync(signalC);
+                }
+            }
+
+            if (entries0 != null)
+                submit(entries0, curFut0);
+
+            // Create compound future for this flush.
+            GridCompoundFuture<Object, Object> res = null;
+
+            for (IgniteInternalFuture<Object> f : locFuts) {
+                if (res == null)
+                    res = new GridCompoundFuture<>(ctx);
+
+                res.add(f);
+            }
+
+            for (IgniteInternalFuture<Object> f : reqs.values()) {
+                if (res == null)
+                    res = new GridCompoundFuture<>(ctx);
+
+                res.add(f);
+            }
+
+            if (res != null)
+                res.markInitialized();
+
+            return res;
+        }
+
+        /**
+         * Increments active tasks count.
+         *
+         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+         */
+        private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
+            U.acquire(sem);
+        }
+
+        /**
+         * @param f Future that finished.
+         */
+        private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+            assert f != null;
+
+            sem.release();
+        }
+
+        /**
+         * @param entries Entries to submit.
+         * @param curFut Current future.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
+            throws IgniteInterruptedCheckedException {
+            assert entries != null;
+            assert !entries.isEmpty();
+            assert curFut != null;
+
+            incrementActiveTasks();
+
+            IgniteInternalFuture<Object> fut;
+
+            if (isLocNode) {
+                fut = ctx.closure().callLocalSafe(
+                    new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
+
+                locFuts.add(fut);
+
+                fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                    @Override public void apply(IgniteInternalFuture<Object> t) {
+                        try {
+                            boolean rmv = locFuts.remove(t);
+
+                            assert rmv;
+
+                            curFut.onDone(t.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            curFut.onDone(e);
+                        }
+                    }
+                });
+            }
+            else {
+                byte[] entriesBytes;
+
+                try {
+                    if (compact) {
+                        entriesBytes = ctx.config().getMarshaller()
+                            .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
+                    }
+                    else
+                        entriesBytes = ctx.config().getMarshaller().marshal(entries);
+
+                    if (updaterBytes == null) {
+                        assert updater != null;
+
+                        updaterBytes = ctx.config().getMarshaller().marshal(updater);
+                    }
+
+                    if (topicBytes == null)
+                        topicBytes = ctx.config().getMarshaller().marshal(topic);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to marshal (request will not be sent).", e);
+
+                    return;
+                }
+
+                GridDeployment dep = null;
+                GridPeerDeployAware jobPda0 = null;
+
+                if (ctx.deploy().enabled()) {
+                    try {
+                        jobPda0 = jobPda;
+
+                        assert jobPda0 != null;
+
+                        dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
+
+                        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+
+                        if (cache != null)
+                            cache.context().deploy().onEnter();
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+
+                        return;
+                    }
+
+                    if (dep == null)
+                        U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
+                }
+
+                long reqId = idGen.incrementAndGet();
+
+                fut = curFut;
+
+                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+                GridDataLoadRequest req = new GridDataLoadRequest(
+                    reqId,
+                    topicBytes,
+                    cacheName,
+                    updaterBytes,
+                    entriesBytes,
+                    true,
+                    skipStore,
+                    dep != null ? dep.deployMode() : null,
+                    dep != null ? jobPda0.deployClass().getName() : null,
+                    dep != null ? dep.userVersion() : null,
+                    dep != null ? dep.participants() : null,
+                    dep != null ? dep.classLoaderId() : null,
+                    dep == null);
+
+                try {
+                    ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
+                }
+                catch (IgniteCheckedException e) {
+                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+                        ((GridFutureAdapter<Object>)fut).onDone(e);
+                    else
+                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
+                            "request (node has left): " + node.id()));
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void onNodeLeft() {
+            assert !isLocNode;
+            assert bufMappings.get(node.id()) != this;
+
+            if (log.isDebugEnabled())
+                log.debug("Forcibly completing futures (node has left): " + node.id());
+
+            Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                "(node has left): " + node.id());
+
+            for (GridFutureAdapter<Object> f : reqs.values())
+                f.onDone(e);
+
+            // Make sure to complete current future.
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+            }
+
+            curFut0.onDone(e);
+        }
+
+        /**
+         * @param res Response.
+         */
+        void onResponse(GridDataLoadResponse res) {
+            if (log.isDebugEnabled())
+                log.debug("Received data load response: " + res);
+
+            GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+            if (f == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Future for request has not been found: " + res.requestId());
+
+                return;
+            }
+
+            Throwable err = null;
+
+            byte[] errBytes = res.errorBytes();
+
+            if (errBytes != null) {
+                try {
+                    GridPeerDeployAware jobPda0 = jobPda;
+
+                    err = ctx.config().getMarshaller().unmarshal(
+                        errBytes,
+                        jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
+
+                    return;
+                }
+            }
+
+            f.onDone(null, err);
+
+            if (log.isDebugEnabled())
+                log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
+        }
+
+        /**
+         *
+         */
+        void cancelAll() {
+            IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this);
+
+            for (IgniteInternalFuture<?> f : locFuts) {
+                try {
+                    f.cancel();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to cancel mini-future.", e);
+                }
+            }
+
+            for (GridFutureAdapter<?> f : reqs.values())
+                f.onDone(err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            int size;
+
+            synchronized (this) {
+                size = entries.size();
+            }
+
+            return S.toString(Buffer.class, this,
+                "entriesCnt", size,
+                "locFutsSize", locFuts.size(),
+                "reqsSize", reqs.size());
+        }
+    }
+
+    /**
+     * Data loader peer-deploy aware.
+     */
+    private class DataLoaderPda implements GridPeerDeployAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Deploy class. */
+        private Class<?> cls;
+
+        /** Class loader. */
+        private ClassLoader ldr;
+
+        /** Collection of objects to detect deploy class and class loader. */
+        private Collection<Object> objs;
+
+        /**
+         * Constructs data loader peer-deploy aware.
+         *
+         * @param objs Collection of objects to detect deploy class and class loader.
+         */
+        private DataLoaderPda(Object... objs) {
+            this.objs = Arrays.asList(objs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Class<?> deployClass() {
+            if (cls == null) {
+                Class<?> cls0 = null;
+
+                if (depCls != null)
+                    cls0 = depCls;
+                else {
+                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+                        Object o = it.next();
+
+                        if (o != null)
+                            cls0 = U.detectClass(o);
+                    }
+
+                    if (cls0 == null || U.isJdk(cls0))
+                        cls0 = IgniteDataStreamerImpl.class;
+                }
+
+                assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
+
+                cls = cls0;
+            }
+
+            return cls;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClassLoader classLoader() {
+            if (ldr == null) {
+                ClassLoader ldr0 = deployClass().getClassLoader();
+
+                // Safety.
+                if (ldr0 == null)
+                    ldr0 = U.gridClassLoader();
+
+                assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
+
+                ldr = ldr0;
+            }
+
+            return ldr;
+        }
+    }
+
+    /**
+     * Entry.
+     */
+    private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private K key;
+
+        /** */
+        private V val;
+
+        /**
+         * @param key Key.
+         * @param val Value.
+         */
+        private Entry0(K key, @Nullable V val) {
+            assert key != null;
+
+            this.key = key;
+            this.val = val;
+        }
+
+        /**
+         * For {@link Externalizable}.
+         */
+        @SuppressWarnings("UnusedDeclaration")
+        public Entry0() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public K getKey() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V setValue(V val) {
+            V old = this.val;
+
+            this.val = val;
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(key);
+            out.writeObject(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            key = (K)in.readObject();
+            val = (V)in.readObject();
+        }
+    }
+
+    /**
+     * Wrapper list with special compact serialization of map entries.
+     */
+    private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**  Wrapped delegate. */
+        private Collection<Map.Entry<K, V>> delegate;
+
+        /** Optional portable processor for converting values. */
+        private GridPortableProcessor portable;
+
+        /**
+         * @param delegate Delegate.
+         * @param portable Portable processor.
+         */
+        private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
+            this.delegate = delegate;
+            this.portable = portable;
+        }
+
+        /**
+         * For {@link Externalizable}.
+         */
+        public Entries0() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<Entry<K, V>> iterator() {
+            return delegate.iterator();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return delegate.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(delegate.size());
+
+            boolean portableEnabled = portable != null;
+
+            for (Map.Entry<K, V> entry : delegate) {
+                if (portableEnabled) {
+                    out.writeObject(portable.marshalToPortable(entry.getKey()));
+                    out.writeObject(portable.marshalToPortable(entry.getValue()));
+                }
+                else {
+                    out.writeObject(entry.getKey());
+                    out.writeObject(entry.getValue());
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            int sz = in.readInt();
+
+            delegate = new ArrayList<>(sz);
+
+            for (int i = 0; i < sz; i++) {
+                Object k = in.readObject();
+                Object v = in.readObject();
+
+                delegate.add(new Entry0<>((K)k, (V)v));
+            }
+        }
+    }
+
+    /**
+     * Isolated updater which only loads entry initial value.
+     */
+    private static class IsolatedUpdater<K, V> implements Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
+
+            GridCacheAdapter<K, V> internalCache = proxy.context().cache();
+
+            if (internalCache.isNear())
+                internalCache = internalCache.context().near().dht();
+
+            GridCacheContext<K, V> cctx = internalCache.context();
+
+            long topVer = cctx.affinity().affinityTopologyVersion();
+
+            GridCacheVersion ver = cctx.versions().next(topVer);
+
+            boolean portable = cctx.portableEnabled();
+
+            for (Map.Entry<K, V> e : entries) {
+                try {
+                    K key = e.getKey();
+                    V val = e.getValue();
+
+                    if (portable) {
+                        key = (K)cctx.marshalToPortable(key);
+                        val = (V)cctx.marshalToPortable(val);
+                    }
+
+                    GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
+
+                    entry.unswap(true, false);
+
+                    entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
+                        GridDrType.DR_LOAD);
+
+                    cctx.evicts().touch(entry, topVer);
+                }
+                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
+                    // No-op.
+                }
+                catch (IgniteCheckedException ex) {
+                    IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+                    U.error(log, "Failed to set initial value for cache entry: " + e, ex);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
index b8cfe77..e34eb16 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/GridDrDataLoadCacheUpdater.java
@@ -31,7 +31,7 @@ import java.util.*;
 /**
  * Data center replication cache updater for data loader.
  */
-public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataLoader.Updater<K, V> {
+public class GridDrDataLoadCacheUpdater<K, V> implements IgniteDataStreamer.Updater<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e960422..49b78f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -302,8 +302,8 @@ public class IgfsDataManager extends IgfsManager {
      *
      * @return New instance of data loader.
      */
-    private IgniteDataLoader<IgfsBlockKey, byte[]> dataLoader() {
-        IgniteDataLoader<IgfsBlockKey, byte[]> ldr =
+    private IgniteDataStreamer<IgfsBlockKey, byte[]> dataLoader() {
+        IgniteDataStreamer<IgfsBlockKey, byte[]> ldr =
             igfsCtx.kernalContext().<IgfsBlockKey, byte[]>dataLoad().dataLoader(dataCachePrj.name());
 
         IgfsConfiguration cfg = igfsCtx.configuration();
@@ -641,7 +641,7 @@ public class IgfsDataManager extends IgfsManager {
                 ", cleanNonColocated=" + cleanNonColocated + ", startIdx=" + startIdx + ", endIdx=" + endIdx + ']');
 
         try {
-            try (IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader()) {
+            try (IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataLoader()) {
                 for (long idx = startIdx; idx <= endIdx; idx++) {
                     ldr.removeData(new IgfsBlockKey(fileInfo.id(), range.affinityKey(), fileInfo.evictExclude(),
                         idx));
@@ -667,7 +667,7 @@ public class IgfsDataManager extends IgfsManager {
         long endIdx = range.endOffset() / fileInfo.blockSize();
 
         try {
-            try (IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader()) {
+            try (IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataLoader()) {
                 long bytesProcessed = 0;
 
                 for (long idx = startIdx; idx <= endIdx; idx++) {
@@ -1705,7 +1705,7 @@ public class IgfsDataManager extends IgfsManager {
                         break;
                     }
 
-                    IgniteDataLoader<IgfsBlockKey, byte[]> ldr = dataLoader();
+                    IgniteDataStreamer<IgfsBlockKey, byte[]> ldr = dataLoader();
 
                     try {
                         IgfsFileMap map = fileInfo.fileMap();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
index 4f5da0b..b60b76b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxConsistencyRestartAbstractSelfTest.java
@@ -99,7 +99,7 @@ public abstract class IgniteTxConsistencyRestartAbstractSelfTest extends GridCom
     public void testTxConsistency() throws Exception {
         startGridsMultiThreaded(GRID_CNT);
 
-        IgniteDataLoader<Object, Object> ldr = grid(0).dataLoader(null);
+        IgniteDataStreamer<Object, Object> ldr = grid(0).dataLoader(null);
 
         for (int i = 0; i < RANGE; i++) {
             ldr.addData(i, 0);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
index 8950f45..414fc42 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedHitsAndMissesSelfTest.java
@@ -140,7 +140,7 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
      * @param g Grid.
      */
     private static void realTimePopulate(final Ignite g) {
-        try (IgniteDataLoader<Integer, Long> ldr = g.dataLoader(null)) {
+        try (IgniteDataStreamer<Integer, Long> ldr = g.dataLoader(null)) {
             // Sets max values to 1 so cache metrics have correct values.
             ldr.perNodeParallelLoadOperations(1);
 
@@ -155,7 +155,7 @@ public class GridCachePartitionedHitsAndMissesSelfTest extends GridCommonAbstrac
     /**
      * Increments value for key.
      */
-    private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> {
+    private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> {
         /** */
         private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() {
             @Override public Void process(MutableEntry<Integer, Long> e, Object... args) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
index ea8f060..ecb2411 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheLruNearEvictionPolicySelfTest.java
@@ -108,7 +108,7 @@ public class GridCacheLruNearEvictionPolicySelfTest extends GridCommonAbstractTe
 
             info("Inserting " + cnt + " keys to cache.");
 
-            try (IgniteDataLoader<Integer, String> ldr = grid(0).dataLoader(null)) {
+            try (IgniteDataStreamer<Integer, String> ldr = grid(0).dataLoader(null)) {
                 for (int i = 0; i < cnt; i++)
                     ldr.addData(i, Integer.toString(i));
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
index a381082..5ac6dae 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/lru/GridCacheNearOnlyLruNearEvictionPolicySelfTest.java
@@ -141,7 +141,7 @@ public class GridCacheNearOnlyLruNearEvictionPolicySelfTest extends GridCommonAb
 
             info("Inserting " + cnt + " keys to cache.");
 
-            try (IgniteDataLoader<Integer, String> ldr = grid(1).dataLoader(null)) {
+            try (IgniteDataStreamer<Integer, String> ldr = grid(1).dataLoader(null)) {
                 for (int i = 0; i < cnt; i++)
                     ldr.addData(i, Integer.toString(i));
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
index e433856..501e56c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.dataload;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -88,7 +87,7 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest {
 
             Ignite g4 = grid(4);
 
-            IgniteDataLoader<Object, Object> dataLdr = g4.dataLoader(null);
+            IgniteDataStreamer<Object, Object> dataLdr = g4.dataLoader(null);
 
             dataLdr.perNodeBufferSize(32);
 
@@ -135,7 +134,7 @@ public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest {
             else
                 fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
 
-            IgniteDataLoader<Integer, String> dataLdr = g0.dataLoader(null);
+            IgniteDataStreamer<Integer, String> dataLdr = g0.dataLoader(null);
 
             Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
index 1945912..fc1a1cb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
@@ -137,7 +137,7 @@ public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest {
 
             Ignite ignite = startGrid();
 
-            final IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(null);
+            final IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(null);
 
             ldr.perNodeBufferSize(8192);
             ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
index a666423..5959957 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java
@@ -19,16 +19,15 @@ package org.apache.ignite.internal.processors.dataload;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.GridCache;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.store.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.optimized.*;
 import org.apache.ignite.spi.discovery.tcp.*;
@@ -37,7 +36,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.jetbrains.annotations.*;
 
-import javax.cache.Cache;
+import javax.cache.*;
 import javax.cache.configuration.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -177,7 +176,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
             Ignite g2 = startGrid(2);
             startGrid(3);
 
-            final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
+            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataLoader(null);
 
             ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
 
@@ -219,7 +218,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
 
             assertEquals(total, s2 + s3);
 
-            final IgniteDataLoader<Integer, Integer> rmvLdr = g2.dataLoader(null);
+            final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataLoader(null);
 
             rmvLdr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
 
@@ -299,7 +298,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
             final int cnt = 40_000;
             final int threads = 10;
 
-            try (final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null)) {
+            try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataLoader(null)) {
                 final AtomicInteger idxGen = new AtomicInteger();
 
                 IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
@@ -359,7 +358,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
                 new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
                 new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
 
-            IgniteDataLoader<Object, Object> dataLdr = g1.dataLoader(null);
+            IgniteDataStreamer<Object, Object> dataLdr = g1.dataLoader(null);
 
             for (int i = 0, size = arrays.size(); i < 1000; i++) {
                 Object arr = arrays.get(i % size);
@@ -417,7 +416,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
             Ignite g1 = grid(1);
 
             // Get and configure loader.
-            final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null);
+            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataLoader(null);
 
             ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>individual());
             ldr.perNodeBufferSize(2);
@@ -519,7 +518,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
         try {
             Ignite g1 = startGrid(1);
 
-            IgniteDataLoader<Object, Object> ldr = g1.dataLoader(null);
+            IgniteDataStreamer<Object, Object> ldr = g1.dataLoader(null);
 
             ldr.close(false);
 
@@ -677,7 +676,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
 
             final IgniteCache<Integer, Integer> c = g.jcache(null);
 
-            final IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+            final IgniteDataStreamer<Integer, Integer> ldr = g.dataLoader(null);
 
             ldr.perNodeBufferSize(10);
 
@@ -729,7 +728,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
 
             IgniteCache<Integer, Integer> c = g.jcache(null);
 
-            IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+            IgniteDataStreamer<Integer, Integer> ldr = g.dataLoader(null);
 
             ldr.perNodeBufferSize(10);
 
@@ -776,7 +775,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
 
             assertTrue(c.localSize() == 0);
 
-            IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null);
+            IgniteDataStreamer<Integer, Integer> ldr = g.dataLoader(null);
 
             ldr.perNodeBufferSize(10);
             ldr.autoFlushFrequency(3000);
@@ -821,7 +820,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
             for (int i = 0; i < 1000; i++)
                 storeMap.put(i, i);
 
-            try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) {
+            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataLoader(null)) {
                 ldr.allowOverwrite(true);
 
                 assertFalse(ldr.skipStore());
@@ -839,7 +838,7 @@ public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest {
             for (int i = 1000; i < 2000; i++)
                 assertEquals(i, storeMap.get(i));
 
-            try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) {
+            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataLoader(null)) {
                 ldr.allowOverwrite(true);
 
                 ldr.skipStore(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
index e8ffedf..769f201 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/colocation/GridTestMain.java
@@ -180,7 +180,7 @@ public class GridTestMain {
         ExecutorCompletionService<Object> execSvc =
             new ExecutorCompletionService<>(Executors.newFixedThreadPool(numThreads));
 
-        try (IgniteDataLoader<GridTestKey, Long> ldr = G.ignite().dataLoader("partitioned")) {
+        try (IgniteDataStreamer<GridTestKey, Long> ldr = G.ignite().dataLoader("partitioned")) {
             for (int i = 0; i < numThreads; i++) {
                 final int threadId = i;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
index 8615e10..4ec0d82 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/discovery/GridGcTimeoutTest.java
@@ -40,7 +40,7 @@ public class GridGcTimeoutTest {
     public static void main(String[] args) {
         Ignite g = G.start(U.resolveIgniteUrl(CFG_PATH));
 
-        IgniteDataLoader<Long, String> ldr = g.dataLoader(null);
+        IgniteDataStreamer<Long, String> ldr = g.dataLoader(null);
 
         ldr.perNodeBufferSize(16 * 1024);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
index 4e6d9b5..50c7eba 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest1.java
@@ -33,7 +33,7 @@ public class GridContinuousMapperLoadTest1 {
         try (Ignite g = G.start("examples/config/example-cache.xml")) {
             int max = 30000;
 
-            IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated");
+            IgniteDataStreamer<Integer, TestObject> ldr = g.dataLoader("replicated");
 
             for (int i = 0; i < max; i++)
                 ldr.addData(i, new TestObject(i, "Test object: " + i));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
index c9ba9ab..c6f364a 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/mapper/GridContinuousMapperLoadTest2.java
@@ -57,7 +57,7 @@ public class GridContinuousMapperLoadTest2 {
         try {
             int max = 20000;
 
-            IgniteDataLoader<Integer, TestObject> ldr = g.dataLoader("replicated");
+            IgniteDataStreamer<Integer, TestObject> ldr = g.dataLoader("replicated");
 
             for (int i = 0; i < max; i++)
                 ldr.addData(i, new TestObject(i, "Test object: " + i));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
index 58478d3..d4dcf22 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteMock.java
@@ -22,7 +22,7 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.*;
 import org.jetbrains.annotations.*;
 
@@ -171,7 +171,7 @@ public class IgniteMock implements Ignite {
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+    @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
----------------------------------------------------------------------
diff --git a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
index 6f5553b..53d6f26 100644
--- a/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
+++ b/modules/scalar/src/main/scala/org/apache/ignite/scalar/scalar.scala
@@ -17,9 +17,6 @@
 
 package org.apache.ignite.scalar
 
-import java.net.URL
-import java.util.UUID
-
 import org.apache.ignite._
 import org.apache.ignite.cache.query.annotations.{QuerySqlField, QueryTextField}
 import org.apache.ignite.cluster.ClusterNode
@@ -27,6 +24,9 @@ import org.apache.ignite.configuration.IgniteConfiguration
 import org.apache.ignite.internal.IgniteVersionUtils._
 import org.jetbrains.annotations.Nullable
 
+import java.net.URL
+import java.util.UUID
+
 import scala.annotation.meta.field
 
 /**
@@ -294,7 +294,7 @@ object scalar extends ScalarConversions {
      */
     @inline def dataLoader$[K, V](
         @Nullable cacheName: String,
-        bufSize: Int): IgniteDataLoader[K, V] = {
+        bufSize: Int): IgniteDataStreamer[K, V] = {
         val dl = ignite$.dataLoader[K, V](cacheName)
 
         dl.perNodeBufferSize(bufSize)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 31aa9e5..f9f6e9e 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -247,7 +247,7 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+    @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
         assert g != null;
 
         return g.dataLoader(cacheName);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
index e26ab9f..b70d618 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryBenchmark.java
@@ -40,7 +40,7 @@ public class IgniteSqlQueryBenchmark extends IgniteCacheAbstractBenchmark {
 
         long start = System.nanoTime();
 
-        try (IgniteDataLoader<Integer, Person> dataLdr = ignite().dataLoader(cache.getName())) {
+        try (IgniteDataStreamer<Integer, Person> dataLdr = ignite().dataLoader(cache.getName())) {
             for (int i = 0; i < args.range() && !Thread.currentThread().isInterrupted(); i++) {
                 dataLdr.addData(i, new Person(i, "firstName" + i, "lastName" + i, i * 1000));
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
index 4e34d14..f6ba47a 100644
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteSqlQueryJoinBenchmark.java
@@ -39,7 +39,7 @@ public class IgniteSqlQueryJoinBenchmark extends IgniteCacheAbstractBenchmark {
 
         long start = System.nanoTime();
 
-        try (IgniteDataLoader<Object, Object> dataLdr = ignite().dataLoader(cache.getName())) {
+        try (IgniteDataStreamer<Object, Object> dataLdr = ignite().dataLoader(cache.getName())) {
             final int orgRange = args.range() / 10;
 
             // Populate organizations.


[2/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
deleted file mode 100644
index ed3bbcb..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java
+++ /dev/null
@@ -1,1453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dr.*;
-import org.apache.ignite.internal.processors.portable.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.Map.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- * Data loader implementation.
- */
-@SuppressWarnings("unchecked")
-public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delayed {
-    /** Isolated updater. */
-    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
-
-    /** Cache updater. */
-    private Updater<K, V> updater = ISOLATED_UPDATER;
-
-    /** */
-    private byte[] updaterBytes;
-
-    /** Max remap count before issuing an error. */
-    private static final int DFLT_MAX_REMAP_CNT = 32;
-
-    /** Log reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Cache name ({@code null} for default cache). */
-    private final String cacheName;
-
-    /** Portable enabled flag. */
-    private final boolean portableEnabled;
-
-    /**
-     *  If {@code true} then data will be transferred in compact format (only keys and values).
-     *  Otherwise full map entry will be transferred (this is requires by DR internal logic).
-     */
-    private final boolean compact;
-
-    /** Per-node buffer size. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
-
-    /** */
-    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
-
-    /** */
-    private long autoFlushFreq;
-
-    /** Mapping. */
-    @GridToStringInclude
-    private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr;
-
-    /** Context. */
-    private final GridKernalContext ctx;
-
-    /** Communication topic for responses. */
-    private final Object topic;
-
-    /** */
-    private byte[] topicBytes;
-
-    /** {@code True} if data loader has been cancelled. */
-    private volatile boolean cancelled;
-
-    /** Active futures of this data loader. */
-    @GridToStringInclude
-    private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
-
-    /** Closure to remove from active futures. */
-    @GridToStringExclude
-    private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
-        @Override public void apply(IgniteInternalFuture<?> t) {
-            boolean rmv = activeFuts.remove(t);
-
-            assert rmv;
-        }
-    };
-
-    /** Job peer deploy aware. */
-    private volatile GridPeerDeployAware jobPda;
-
-    /** Deployment class. */
-    private Class<?> depCls;
-
-    /** Future to track loading finish. */
-    private final GridFutureAdapter<?> fut;
-
-    /** Public API future to track loading finish. */
-    private final IgniteFuture<?> publicFut;
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** Closed flag. */
-    private final AtomicBoolean closed = new AtomicBoolean();
-
-    /** */
-    private volatile long lastFlushTime = U.currentTimeMillis();
-
-    /** */
-    private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ;
-
-    /** */
-    private boolean skipStore;
-
-    /** */
-    private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
-
-    /**
-     * @param ctx Grid kernal context.
-     * @param cacheName Cache name.
-     * @param flushQ Flush queue.
-     * @param compact If {@code true} data is transferred in compact mode (only keys and values).
-     *                Otherwise full map entry will be transferred (this is required by DR internal logic).
-     */
-    public IgniteDataLoaderImpl(
-        final GridKernalContext ctx,
-        @Nullable final String cacheName,
-        DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ,
-        boolean compact
-    ) {
-        assert ctx != null;
-
-        this.ctx = ctx;
-        this.cacheName = cacheName;
-        this.flushQ = flushQ;
-        this.compact = compact;
-
-        log = U.logger(ctx, logRef, IgniteDataLoaderImpl.class);
-
-        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
-        if (node == null)
-            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
-
-        portableEnabled = ctx.portable().portableEnabled(node, cacheName);
-
-        discoLsnr = new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
-                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
-                UUID id = discoEvt.eventNode().id();
-
-                // Remap regular mappings.
-                final Buffer buf = bufMappings.remove(id);
-
-                if (buf != null) {
-                    // Only async notification is possible since
-                    // discovery thread may be trapped otherwise.
-                    ctx.closure().callLocalSafe(
-                        new Callable<Object>() {
-                            @Override public Object call() throws Exception {
-                                buf.onNodeLeft();
-
-                                return null;
-                            }
-                        },
-                        true /* system pool */
-                    );
-                }
-            }
-        };
-
-        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
-
-        // Generate unique topic for this loader.
-        topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
-
-        ctx.io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof GridDataLoadResponse;
-
-                GridDataLoadResponse res = (GridDataLoadResponse)msg;
-
-                if (log.isDebugEnabled())
-                    log.debug("Received data load response: " + res);
-
-                Buffer buf = bufMappings.get(nodeId);
-
-                if (buf != null)
-                    buf.onResponse(res);
-
-                else if (log.isDebugEnabled())
-                    log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
-            }
-        });
-
-        if (log.isDebugEnabled())
-            log.debug("Added response listener within topic: " + topic);
-
-        fut = new GridDataLoaderFuture(ctx, this);
-
-        publicFut = new IgniteFutureImpl<>(fut);
-    }
-
-    /**
-     * Enters busy lock.
-     */
-    private void enterBusy() {
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Data loader has been closed.");
-    }
-
-    /**
-     * Leaves busy lock.
-     */
-    private void leaveBusy() {
-        busyLock.leaveBusy();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> future() {
-        return publicFut;
-    }
-
-    /**
-     * @return Internal future.
-     */
-    public IgniteInternalFuture<?> internalFuture() {
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void deployClass(Class<?> depCls) {
-        this.depCls = depCls;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updater(Updater<K, V> updater) {
-        A.notNull(updater, "updater");
-
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean allowOverwrite() {
-        return updater != ISOLATED_UPDATER;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void allowOverwrite(boolean allow) {
-        if (allow == allowOverwrite())
-            return;
-
-        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
-        if (node == null)
-            throw new IgniteException("Failed to get node for cache: " + cacheName);
-
-        updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipStore() {
-        return skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void skipStore(boolean skipStore) {
-        this.skipStore = skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public String cacheName() {
-        return cacheName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int perNodeBufferSize() {
-        return bufSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void perNodeBufferSize(int bufSize) {
-        A.ensure(bufSize > 0, "bufSize > 0");
-
-        this.bufSize = bufSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int perNodeParallelLoadOperations() {
-        return parallelOps;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void perNodeParallelLoadOperations(int parallelOps) {
-        this.parallelOps = parallelOps;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long autoFlushFrequency() {
-        return autoFlushFreq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void autoFlushFrequency(long autoFlushFreq) {
-        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
-
-        long old = this.autoFlushFreq;
-
-        if (autoFlushFreq != old) {
-            this.autoFlushFreq = autoFlushFreq;
-
-            if (autoFlushFreq != 0 && old == 0)
-                flushQ.add(this);
-            else if (autoFlushFreq == 0)
-                flushQ.remove(this);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
-        A.notNull(entries, "entries");
-
-        return addData(entries.entrySet());
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
-        A.notEmpty(entries, "entries");
-
-        enterBusy();
-
-        try {
-            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
-
-            resFut.listenAsync(rmvActiveFut);
-
-            activeFuts.add(resFut);
-
-            Collection<K> keys = null;
-
-            if (entries.size() > 1) {
-                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
-
-                for (Map.Entry<K, V> entry : entries)
-                    keys.add(entry.getKey());
-            }
-
-            load0(entries, resFut, keys, 0);
-
-            return new IgniteFutureImpl<>(resFut);
-        }
-        catch (IgniteException e) {
-            return new IgniteFinishedFutureImpl<>(ctx, e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
-        A.notNull(entry, "entry");
-
-        return addData(F.asList(entry));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(K key, V val) {
-        A.notNull(key, "key");
-
-        return addData(new Entry0<>(key, val));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> removeData(K key) {
-        return addData(key, null);
-    }
-
-    /**
-     * @param entries Entries.
-     * @param resFut Result future.
-     * @param activeKeys Active keys.
-     * @param remaps Remaps count.
-     */
-    private void load0(
-        Collection<? extends Map.Entry<K, V>> entries,
-        final GridFutureAdapter<Object> resFut,
-        @Nullable final Collection<K> activeKeys,
-        final int remaps
-    ) {
-        assert entries != null;
-
-        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
-
-        boolean initPda = ctx.deploy().enabled() && jobPda == null;
-
-        for (Map.Entry<K, V> entry : entries) {
-            List<ClusterNode> nodes;
-
-            try {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                if (initPda) {
-                    jobPda = new DataLoaderPda(key, entry.getValue(), updater);
-
-                    initPda = false;
-                }
-
-                nodes = nodes(key);
-            }
-            catch (IgniteCheckedException e) {
-                resFut.onDone(e);
-
-                return;
-            }
-
-            if (F.isEmpty(nodes)) {
-                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
-                    "(no nodes with cache found in topology) [infos=" + entries.size() +
-                    ", cacheName=" + cacheName + ']'));
-
-                return;
-            }
-
-            for (ClusterNode node : nodes) {
-                Collection<Map.Entry<K, V>> col = mappings.get(node);
-
-                if (col == null)
-                    mappings.put(node, col = new ArrayList<>());
-
-                col.add(entry);
-            }
-        }
-
-        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
-            final UUID nodeId = e.getKey().id();
-
-            Buffer buf = bufMappings.get(nodeId);
-
-            if (buf == null) {
-                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
-
-                if (old != null)
-                    buf = old;
-            }
-
-            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
-
-            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> t) {
-                    try {
-                        t.get();
-
-                        if (activeKeys != null) {
-                            for (Map.Entry<K, V> e : entriesForNode)
-                                activeKeys.remove(e.getKey());
-
-                            if (activeKeys.isEmpty())
-                                resFut.onDone();
-                        }
-                        else {
-                            assert entriesForNode.size() == 1;
-
-                            // That has been a single key,
-                            // so complete result future right away.
-                            resFut.onDone();
-                        }
-                    }
-                    catch (IgniteCheckedException e1) {
-                        if (log.isDebugEnabled())
-                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
-
-                        if (cancelled) {
-                            resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " +
-                                IgniteDataLoaderImpl.this, e1));
-                        }
-                        else if (remaps + 1 > maxRemapCnt) {
-                            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
-                                + remaps), e1);
-                        }
-                        else
-                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
-                    }
-                }
-            };
-
-            GridFutureAdapter<?> f;
-
-            try {
-                f = buf.update(entriesForNode, lsnr);
-            }
-            catch (IgniteInterruptedCheckedException e1) {
-                resFut.onDone(e1);
-
-                return;
-            }
-
-            if (ctx.discovery().node(nodeId) == null) {
-                if (bufMappings.remove(nodeId, buf))
-                    buf.onNodeLeft();
-
-                if (f != null)
-                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
-                        "(node has left): " + nodeId));
-            }
-        }
-    }
-
-    /**
-     * @param key Key to map.
-     * @return Nodes to send requests to.
-     * @throws IgniteCheckedException If failed.
-     */
-    private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
-        GridAffinityProcessor aff = ctx.affinity();
-
-        return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
-            Collections.singletonList(aff.mapKeyToNode(cacheName, key));
-    }
-
-    /**
-     * Performs flush.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    private void doFlush() throws IgniteCheckedException {
-        lastFlushTime = U.currentTimeMillis();
-
-        List<IgniteInternalFuture> activeFuts0 = null;
-
-        int doneCnt = 0;
-
-        for (IgniteInternalFuture<?> f : activeFuts) {
-            if (!f.isDone()) {
-                if (activeFuts0 == null)
-                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
-
-                activeFuts0.add(f);
-            }
-            else {
-                f.get();
-
-                doneCnt++;
-            }
-        }
-
-        if (activeFuts0 == null || activeFuts0.isEmpty())
-            return;
-
-        while (true) {
-            Queue<IgniteInternalFuture<?>> q = null;
-
-            for (Buffer buf : bufMappings.values()) {
-                IgniteInternalFuture<?> flushFut = buf.flush();
-
-                if (flushFut != null) {
-                    if (q == null)
-                        q = new ArrayDeque<>(bufMappings.size() * 2);
-
-                    q.add(flushFut);
-                }
-            }
-
-            if (q != null) {
-                assert !q.isEmpty();
-
-                boolean err = false;
-
-                for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
-                    try {
-                        fut.get();
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to flush buffer: " + e);
-
-                        err = true;
-                    }
-                }
-
-                if (err)
-                    // Remaps needed - flush buffers.
-                    continue;
-            }
-
-            doneCnt = 0;
-
-            for (int i = 0; i < activeFuts0.size(); i++) {
-                IgniteInternalFuture f = activeFuts0.get(i);
-
-                if (f == null)
-                    doneCnt++;
-                else if (f.isDone()) {
-                    f.get();
-
-                    doneCnt++;
-
-                    activeFuts0.set(i, null);
-                }
-                else
-                    break;
-            }
-
-            if (doneCnt == activeFuts0.size())
-                return;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public void flush() throws IgniteException {
-        enterBusy();
-
-        try {
-            doFlush();
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Flushes every internal buffer if buffer was flushed before passed in
-     * threshold.
-     * <p>
-     * Does not wait for result and does not fail on errors assuming that this method
-     * should be called periodically.
-     */
-    @Override public void tryFlush() throws IgniteInterruptedException {
-        if (!busyLock.enterBusy())
-            return;
-
-        try {
-            for (Buffer buf : bufMappings.values())
-                buf.flush();
-
-            lastFlushTime = U.currentTimeMillis();
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param cancel {@code True} to close with cancellation.
-     * @throws IgniteException If failed.
-     */
-    @Override public void close(boolean cancel) throws IgniteException {
-        try {
-            closeEx(cancel);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-    }
-
-    /**
-     * @param cancel {@code True} to close with cancellation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void closeEx(boolean cancel) throws IgniteCheckedException {
-        if (!closed.compareAndSet(false, true))
-            return;
-
-        busyLock.block();
-
-        if (log.isDebugEnabled())
-            log.debug("Closing data loader [ldr=" + this + ", cancel=" + cancel + ']');
-
-        IgniteCheckedException e = null;
-
-        try {
-            // Assuming that no methods are called on this loader after this method is called.
-            if (cancel) {
-                cancelled = true;
-
-                for (Buffer buf : bufMappings.values())
-                    buf.cancelAll();
-            }
-            else
-                doFlush();
-
-            ctx.event().removeLocalEventListener(discoLsnr);
-
-            ctx.io().removeMessageListener(topic);
-        }
-        catch (IgniteCheckedException e0) {
-            e = e0;
-        }
-
-        fut.onDone(null, e);
-
-        if (e != null)
-            throw e;
-    }
-
-    /**
-     * @return {@code true} If the loader is closed.
-     */
-    boolean isClosed() {
-        return fut.isDone();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteException {
-        close(false);
-    }
-
-    /**
-     * @return Max remap count.
-     */
-    public int maxRemapCount() {
-        return maxRemapCnt;
-    }
-
-    /**
-     * @param maxRemapCnt New max remap count.
-     */
-    public void maxRemapCount(int maxRemapCnt) {
-        this.maxRemapCnt = maxRemapCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteDataLoaderImpl.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getDelay(TimeUnit unit) {
-        return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * @return Next flush time.
-     */
-    private long nextFlushTime() {
-        return lastFlushTime + autoFlushFreq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(Delayed o) {
-        return nextFlushTime() > ((IgniteDataLoaderImpl)o).nextFlushTime() ? 1 : -1;
-    }
-
-    /**
-     *
-     */
-    private class Buffer {
-        /** Node. */
-        private final ClusterNode node;
-
-        /** Active futures. */
-        private final Collection<IgniteInternalFuture<Object>> locFuts;
-
-        /** Buffered entries. */
-        private List<Map.Entry<K, V>> entries;
-
-        /** */
-        @GridToStringExclude
-        private GridFutureAdapter<Object> curFut;
-
-        /** Local node flag. */
-        private final boolean isLocNode;
-
-        /** ID generator. */
-        private final AtomicLong idGen = new AtomicLong();
-
-        /** Active futures. */
-        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
-
-        /** */
-        private final Semaphore sem;
-
-        /** Closure to signal on task finish. */
-        @GridToStringExclude
-        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
-            @Override public void apply(IgniteInternalFuture<Object> t) {
-                signalTaskFinished(t);
-            }
-        };
-
-        /**
-         * @param node Node.
-         */
-        Buffer(ClusterNode node) {
-            assert node != null;
-
-            this.node = node;
-
-            locFuts = new GridConcurrentHashSet<>();
-            reqs = new ConcurrentHashMap8<>();
-
-            // Cache local node flag.
-            isLocNode = node.equals(ctx.discovery().localNode());
-
-            entries = newEntries();
-            curFut = new GridFutureAdapter<>(ctx);
-            curFut.listenAsync(signalC);
-
-            sem = new Semaphore(parallelOps);
-        }
-
-        /**
-         * @param newEntries Infos.
-         * @param lsnr Listener for the operation future.
-         * @throws IgniteInterruptedCheckedException If failed.
-         * @return Future for operation.
-         */
-        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
-            IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
-            List<Map.Entry<K, V>> entries0 = null;
-            GridFutureAdapter<Object> curFut0;
-
-            synchronized (this) {
-                curFut0 = curFut;
-
-                curFut0.listenAsync(lsnr);
-
-                for (Map.Entry<K, V> entry : newEntries)
-                    entries.add(entry);
-
-                if (entries.size() >= bufSize) {
-                    entries0 = entries;
-
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>(ctx);
-                    curFut.listenAsync(signalC);
-                }
-            }
-
-            if (entries0 != null) {
-                submit(entries0, curFut0);
-
-                if (cancelled)
-                    curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this));
-            }
-
-            return curFut0;
-        }
-
-        /**
-         * @return Fresh collection with some space for outgrowth.
-         */
-        private List<Map.Entry<K, V>> newEntries() {
-            return new ArrayList<>((int)(bufSize * 1.2));
-        }
-
-        /**
-         * @return Future if any submitted.
-         *
-         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
-         */
-        @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
-            List<Map.Entry<K, V>> entries0 = null;
-            GridFutureAdapter<Object> curFut0 = null;
-
-            synchronized (this) {
-                if (!entries.isEmpty()) {
-                    entries0 = entries;
-                    curFut0 = curFut;
-
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>(ctx);
-                    curFut.listenAsync(signalC);
-                }
-            }
-
-            if (entries0 != null)
-                submit(entries0, curFut0);
-
-            // Create compound future for this flush.
-            GridCompoundFuture<Object, Object> res = null;
-
-            for (IgniteInternalFuture<Object> f : locFuts) {
-                if (res == null)
-                    res = new GridCompoundFuture<>(ctx);
-
-                res.add(f);
-            }
-
-            for (IgniteInternalFuture<Object> f : reqs.values()) {
-                if (res == null)
-                    res = new GridCompoundFuture<>(ctx);
-
-                res.add(f);
-            }
-
-            if (res != null)
-                res.markInitialized();
-
-            return res;
-        }
-
-        /**
-         * Increments active tasks count.
-         *
-         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
-         */
-        private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
-            U.acquire(sem);
-        }
-
-        /**
-         * @param f Future that finished.
-         */
-        private void signalTaskFinished(IgniteInternalFuture<Object> f) {
-            assert f != null;
-
-            sem.release();
-        }
-
-        /**
-         * @param entries Entries to submit.
-         * @param curFut Current future.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
-            throws IgniteInterruptedCheckedException {
-            assert entries != null;
-            assert !entries.isEmpty();
-            assert curFut != null;
-
-            incrementActiveTasks();
-
-            IgniteInternalFuture<Object> fut;
-
-            if (isLocNode) {
-                fut = ctx.closure().callLocalSafe(
-                    new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
-
-                locFuts.add(fut);
-
-                fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
-                    @Override public void apply(IgniteInternalFuture<Object> t) {
-                        try {
-                            boolean rmv = locFuts.remove(t);
-
-                            assert rmv;
-
-                            curFut.onDone(t.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            curFut.onDone(e);
-                        }
-                    }
-                });
-            }
-            else {
-                byte[] entriesBytes;
-
-                try {
-                    if (compact) {
-                        entriesBytes = ctx.config().getMarshaller()
-                            .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
-                    }
-                    else
-                        entriesBytes = ctx.config().getMarshaller().marshal(entries);
-
-                    if (updaterBytes == null) {
-                        assert updater != null;
-
-                        updaterBytes = ctx.config().getMarshaller().marshal(updater);
-                    }
-
-                    if (topicBytes == null)
-                        topicBytes = ctx.config().getMarshaller().marshal(topic);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to marshal (request will not be sent).", e);
-
-                    return;
-                }
-
-                GridDeployment dep = null;
-                GridPeerDeployAware jobPda0 = null;
-
-                if (ctx.deploy().enabled()) {
-                    try {
-                        jobPda0 = jobPda;
-
-                        assert jobPda0 != null;
-
-                        dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
-
-                        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-
-                        if (cache != null)
-                            cache.context().deploy().onEnter();
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
-
-                        return;
-                    }
-
-                    if (dep == null)
-                        U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
-                }
-
-                long reqId = idGen.incrementAndGet();
-
-                fut = curFut;
-
-                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
-
-                GridDataLoadRequest req = new GridDataLoadRequest(
-                    reqId,
-                    topicBytes,
-                    cacheName,
-                    updaterBytes,
-                    entriesBytes,
-                    true,
-                    skipStore,
-                    dep != null ? dep.deployMode() : null,
-                    dep != null ? jobPda0.deployClass().getName() : null,
-                    dep != null ? dep.userVersion() : null,
-                    dep != null ? dep.participants() : null,
-                    dep != null ? dep.classLoaderId() : null,
-                    dep == null);
-
-                try {
-                    ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
-                }
-                catch (IgniteCheckedException e) {
-                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
-                        ((GridFutureAdapter<Object>)fut).onDone(e);
-                    else
-                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
-                            "request (node has left): " + node.id()));
-                }
-            }
-        }
-
-        /**
-         *
-         */
-        void onNodeLeft() {
-            assert !isLocNode;
-            assert bufMappings.get(node.id()) != this;
-
-            if (log.isDebugEnabled())
-                log.debug("Forcibly completing futures (node has left): " + node.id());
-
-            Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
-                "(node has left): " + node.id());
-
-            for (GridFutureAdapter<Object> f : reqs.values())
-                f.onDone(e);
-
-            // Make sure to complete current future.
-            GridFutureAdapter<Object> curFut0;
-
-            synchronized (this) {
-                curFut0 = curFut;
-            }
-
-            curFut0.onDone(e);
-        }
-
-        /**
-         * @param res Response.
-         */
-        void onResponse(GridDataLoadResponse res) {
-            if (log.isDebugEnabled())
-                log.debug("Received data load response: " + res);
-
-            GridFutureAdapter<?> f = reqs.remove(res.requestId());
-
-            if (f == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Future for request has not been found: " + res.requestId());
-
-                return;
-            }
-
-            Throwable err = null;
-
-            byte[] errBytes = res.errorBytes();
-
-            if (errBytes != null) {
-                try {
-                    GridPeerDeployAware jobPda0 = jobPda;
-
-                    err = ctx.config().getMarshaller().unmarshal(
-                        errBytes,
-                        jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
-                }
-                catch (IgniteCheckedException e) {
-                    f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
-
-                    return;
-                }
-            }
-
-            f.onDone(null, err);
-
-            if (log.isDebugEnabled())
-                log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
-        }
-
-        /**
-         *
-         */
-        void cancelAll() {
-            IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataLoaderImpl.this);
-
-            for (IgniteInternalFuture<?> f : locFuts) {
-                try {
-                    f.cancel();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to cancel mini-future.", e);
-                }
-            }
-
-            for (GridFutureAdapter<?> f : reqs.values())
-                f.onDone(err);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            int size;
-
-            synchronized (this) {
-                size = entries.size();
-            }
-
-            return S.toString(Buffer.class, this,
-                "entriesCnt", size,
-                "locFutsSize", locFuts.size(),
-                "reqsSize", reqs.size());
-        }
-    }
-
-    /**
-     * Data loader peer-deploy aware.
-     */
-    private class DataLoaderPda implements GridPeerDeployAware {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Deploy class. */
-        private Class<?> cls;
-
-        /** Class loader. */
-        private ClassLoader ldr;
-
-        /** Collection of objects to detect deploy class and class loader. */
-        private Collection<Object> objs;
-
-        /**
-         * Constructs data loader peer-deploy aware.
-         *
-         * @param objs Collection of objects to detect deploy class and class loader.
-         */
-        private DataLoaderPda(Object... objs) {
-            this.objs = Arrays.asList(objs);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Class<?> deployClass() {
-            if (cls == null) {
-                Class<?> cls0 = null;
-
-                if (depCls != null)
-                    cls0 = depCls;
-                else {
-                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
-                        Object o = it.next();
-
-                        if (o != null)
-                            cls0 = U.detectClass(o);
-                    }
-
-                    if (cls0 == null || U.isJdk(cls0))
-                        cls0 = IgniteDataLoaderImpl.class;
-                }
-
-                assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
-
-                cls = cls0;
-            }
-
-            return cls;
-        }
-
-        /** {@inheritDoc} */
-        @Override public ClassLoader classLoader() {
-            if (ldr == null) {
-                ClassLoader ldr0 = deployClass().getClassLoader();
-
-                // Safety.
-                if (ldr0 == null)
-                    ldr0 = U.gridClassLoader();
-
-                assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
-
-                ldr = ldr0;
-            }
-
-            return ldr;
-        }
-    }
-
-    /**
-     * Entry.
-     */
-    private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private K key;
-
-        /** */
-        private V val;
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         */
-        private Entry0(K key, @Nullable V val) {
-            assert key != null;
-
-            this.key = key;
-            this.val = val;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        @SuppressWarnings("UnusedDeclaration")
-        public Entry0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public K getKey() {
-            return key;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V getValue() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V setValue(V val) {
-            V old = this.val;
-
-            this.val = val;
-
-            return old;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(key);
-            out.writeObject(val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            key = (K)in.readObject();
-            val = (V)in.readObject();
-        }
-    }
-
-    /**
-     * Wrapper list with special compact serialization of map entries.
-     */
-    private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**  Wrapped delegate. */
-        private Collection<Map.Entry<K, V>> delegate;
-
-        /** Optional portable processor for converting values. */
-        private GridPortableProcessor portable;
-
-        /**
-         * @param delegate Delegate.
-         * @param portable Portable processor.
-         */
-        private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
-            this.delegate = delegate;
-            this.portable = portable;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        public Entries0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<Entry<K, V>> iterator() {
-            return delegate.iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return delegate.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(delegate.size());
-
-            boolean portableEnabled = portable != null;
-
-            for (Map.Entry<K, V> entry : delegate) {
-                if (portableEnabled) {
-                    out.writeObject(portable.marshalToPortable(entry.getKey()));
-                    out.writeObject(portable.marshalToPortable(entry.getValue()));
-                }
-                else {
-                    out.writeObject(entry.getKey());
-                    out.writeObject(entry.getValue());
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            int sz = in.readInt();
-
-            delegate = new ArrayList<>(sz);
-
-            for (int i = 0; i < sz; i++) {
-                Object k = in.readObject();
-                Object v = in.readObject();
-
-                delegate.add(new Entry0<>((K)k, (V)v));
-            }
-        }
-    }
-
-    /**
-     * Isolated updater which only loads entry initial value.
-     */
-    private static class IsolatedUpdater<K, V> implements Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
-
-            GridCacheAdapter<K, V> internalCache = proxy.context().cache();
-
-            if (internalCache.isNear())
-                internalCache = internalCache.context().near().dht();
-
-            GridCacheContext<K, V> cctx = internalCache.context();
-
-            long topVer = cctx.affinity().affinityTopologyVersion();
-
-            GridCacheVersion ver = cctx.versions().next(topVer);
-
-            boolean portable = cctx.portableEnabled();
-
-            for (Map.Entry<K, V> e : entries) {
-                try {
-                    K key = e.getKey();
-                    V val = e.getValue();
-
-                    if (portable) {
-                        key = (K)cctx.marshalToPortable(key);
-                        val = (V)cctx.marshalToPortable(val);
-                    }
-
-                    GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
-
-                    entry.unswap(true, false);
-
-                    entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
-                        GridDrType.DR_LOAD);
-
-                    cctx.evicts().touch(entry, topVer);
-                }
-                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
-                    // No-op.
-                }
-                catch (IgniteCheckedException ex) {
-                    IgniteLogger log = cache.unwrap(Ignite.class).log();
-
-                    U.error(log, "Failed to set initial value for cache entry: " + e, ex);
-                }
-            }
-        }
-    }
-}


[3/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl

Posted by sb...@apache.org.
# ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b33b651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b33b651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b33b651

Branch: refs/heads/ignite-394
Commit: 9b33b6510f5b82c30c8e75a66eb328b00bc425e4
Parents: 6909cc4
Author: Artem Shutak <as...@gridgain.com>
Authored: Tue Mar 3 21:40:19 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Tue Mar 3 21:40:19 2015 +0300

----------------------------------------------------------------------
 .../datagrid/CacheDataLoaderExample.java        |    6 +-
 .../datagrid/CachePopularNumbersExample.java    |    4 +-
 .../src/main/java/org/apache/ignite/Ignite.java |    6 +-
 .../org/apache/ignite/IgniteDataLoader.java     |  379 -----
 .../org/apache/ignite/IgniteDataStreamer.java   |  379 +++++
 .../apache/ignite/internal/IgniteKernal.java    |    2 +-
 .../processors/cache/GridCacheAdapter.java      |   10 +-
 .../GridDistributedCacheAdapter.java            |    4 +-
 .../dataload/GridDataLoadCacheUpdaters.java     |   18 +-
 .../dataload/GridDataLoadUpdateJob.java         |    4 +-
 .../dataload/GridDataLoaderFuture.java          |    4 +-
 .../dataload/GridDataLoaderProcessor.java       |   16 +-
 .../dataload/IgniteDataLoaderImpl.java          | 1453 ------------------
 .../dataload/IgniteDataStreamerImpl.java        | 1453 ++++++++++++++++++
 .../dr/GridDrDataLoadCacheUpdater.java          |    2 +-
 .../processors/igfs/IgfsDataManager.java        |   10 +-
 ...iteTxConsistencyRestartAbstractSelfTest.java |    2 +-
 ...idCachePartitionedHitsAndMissesSelfTest.java |    4 +-
 .../GridCacheLruNearEvictionPolicySelfTest.java |    2 +-
 ...heNearOnlyLruNearEvictionPolicySelfTest.java |    2 +-
 .../dataload/GridDataLoaderImplSelfTest.java    |    5 +-
 .../dataload/GridDataLoaderPerformanceTest.java |    2 +-
 .../GridDataLoaderProcessorSelfTest.java        |   27 +-
 .../loadtests/colocation/GridTestMain.java      |    2 +-
 .../loadtests/discovery/GridGcTimeoutTest.java  |    2 +-
 .../mapper/GridContinuousMapperLoadTest1.java   |    2 +-
 .../mapper/GridContinuousMapperLoadTest2.java   |    2 +-
 .../ignite/testframework/junits/IgniteMock.java |    4 +-
 .../scala/org/apache/ignite/scalar/scalar.scala |    8 +-
 .../org/apache/ignite/IgniteSpringBean.java     |    2 +-
 .../cache/IgniteSqlQueryBenchmark.java          |    2 +-
 .../cache/IgniteSqlQueryJoinBenchmark.java      |    2 +-
 32 files changed, 1909 insertions(+), 1911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
index 57b0cd2..4cdbfd4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
@@ -21,8 +21,8 @@ import org.apache.ignite.*;
 import org.apache.ignite.examples.*;
 
 /**
- * Demonstrates how cache can be populated with data utilizing {@link IgniteDataLoader} API.
- * {@link IgniteDataLoader} is a lot more efficient to use than standard
+ * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API.
+ * {@link IgniteDataStreamer} is a lot more efficient to use than standard
  * {@code put(...)} operation as it properly buffers cache requests
  * together and properly manages load on remote nodes.
  * <p>
@@ -63,7 +63,7 @@ public class CacheDataLoaderExample {
 
             long start = System.currentTimeMillis();
 
-            try (IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
+            try (IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
                 // Configure loader.
                 ldr.perNodeBufferSize(1024);
                 ldr.perNodeParallelLoadOperations(8);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
index 0f71681..1fc737b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
@@ -92,7 +92,7 @@ public class CachePopularNumbersExample {
      * @throws IgniteException If failed.
      */
     private static void streamData(final Ignite ignite) throws IgniteException {
-        try (IgniteDataLoader<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
+        try (IgniteDataStreamer<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
             // Set larger per-node buffer size since our state is relatively small.
             ldr.perNodeBufferSize(2048);
 
@@ -140,7 +140,7 @@ public class CachePopularNumbersExample {
     /**
      * Increments value for key.
      */
-    private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> {
+    private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> {
         /** */
         private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() {
             @Override public Void process(MutableEntry<Integer, Long> e, Object... args) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 8851d8f..44d4ba9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -42,7 +42,7 @@ import java.util.concurrent.*;
  * In addition to {@link ClusterGroup} functionality, from here you can get the following:
  * <ul>
  * <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed cache.</li>
- * <li>{@link IgniteDataLoader} - functionality for loading data large amounts of data into cache.</li>
+ * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li>
  * <li>{@link IgniteFs} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li>
  * <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li>
  * <li>{@link IgniteScheduler} - functionality for scheduling jobs using UNIX Cron syntax.</li>
@@ -205,12 +205,12 @@ public interface Ignite extends AutoCloseable {
     /**
      * Gets a new instance of data loader associated with given cache name. Data loader
      * is responsible for loading external data into in-memory data grid. For more information
-     * refer to {@link IgniteDataLoader} documentation.
+     * refer to {@link IgniteDataStreamer} documentation.
      *
      * @param cacheName Cache name ({@code null} for default cache).
      * @return Data loader.
      */
-    public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName);
+    public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName);
 
     /**
      * Gets an instance of IGFS - Ignite In-Memory File System, if one is not

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
deleted file mode 100644
index 3cff287..0000000
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite;
-
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Data loader is responsible for loading external data into cache. It achieves it by
- * properly buffering updates and properly mapping keys to nodes responsible for the data
- * to make sure that there is the least amount of data movement possible and optimal
- * network and memory utilization.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- * <p>
- * Also note that {@code GridDataLoader} is not the only way to load data into cache.
- * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
- * method to load data from underlying data store. You can also use standard
- * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
- * likely will not perform as well as this class for loading data. And finally,
- * data can be loaded from underlying data store on demand, whenever it is accessed -
- * for this no explicit data loading step is needed.
- * <p>
- * {@code IgniteDataLoader} supports the following configuration properties:
- * <ul>
- *  <li>
- *      {@link #perNodeBufferSize(int)} - when entries are added to data loader via
- *      {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
- *      away and are buffered internally for better performance and network utilization.
- *      This setting controls the size of internal per-node buffer before buffered data
- *      is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
- *      value.
- *  </li>
- *  <li>
- *      {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
- *      to the data loader via {@link #addData(Object, Object)} method faster than it can
- *      be put in cache. In this case, new buffered load messages are sent to remote nodes
- *      before responses from previous ones are received. This could cause unlimited heap
- *      memory utilization growth on local and remote nodes. To control memory utilization,
- *      this setting limits maximum allowed number of parallel buffered load messages that
- *      are being processed on remote nodes. If this number is exceeded, then
- *      {@link #addData(Object, Object)} method will block to control memory utilization.
- *      Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
- *  </li>
- *  <li>
- *      {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
- *      this is the time after which the loader will make an attempt to submit all data
- *      added so far to remote nodes. Note that there is no guarantee that data will be
- *      delivered after this concrete attempt (e.g., it can fail when topology is
- *      changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
- *  </li>
- *  <li>
- *      {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
- *      updates and allow data loader choose most optimal concurrent implementation.
- *  </li>
- *  <li>
- *      {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries.
- *      It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
- *  </li>
- *  <li>
- *      {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
- *      loaded by a data loader must be class-loadable from the same class-loader.
- *      Ignite will make the best effort to detect the most suitable class-loader
- *      for data loading. However, in complex cases, where compound or deeply nested
- *      class-loaders are used, it is best to specify a deploy class which can be any
- *      class loaded by the class-loader for given data.
- *  </li>
- * </ul>
- */
-public interface IgniteDataLoader<K, V> extends AutoCloseable {
-    /** Default max concurrent put operations count. */
-    public static final int DFLT_MAX_PARALLEL_OPS = 16;
-
-    /** Default per node buffer size. */
-    public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
-
-    /**
-     * Name of cache to load data to.
-     *
-     * @return Cache name or {@code null} for default cache.
-     */
-    public String cacheName();
-
-    /**
-     * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
-     * Default is {@code true}.
-     *
-     * @return Flag value.
-     */
-    public boolean allowOverwrite();
-
-    /**
-     * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
-     * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method.
-     * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
-     *
-     * @param allowOverwrite Flag value.
-     * @throws IgniteException If failed.
-     */
-    public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
-
-    /**
-     * Gets flag indicating that write-through behavior should be disabled for data loading.
-     * Default is {@code false}.
-     *
-     * @return Skip store flag.
-     */
-    public boolean skipStore();
-
-    /**
-     * Sets flag indicating that write-through behavior should be disabled for data loading.
-     * Default is {@code false}.
-     *
-     * @param skipStore Skip store flag.
-     */
-    public void skipStore(boolean skipStore);
-
-    /**
-     * Gets size of per node key-value pairs buffer.
-     *
-     * @return Per node buffer size.
-     */
-    public int perNodeBufferSize();
-
-    /**
-     * Sets size of per node key-value pairs buffer.
-     * <p>
-     * This method should be called prior to {@link #addData(Object, Object)} call.
-     * <p>
-     * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
-     *
-     * @param bufSize Per node buffer size.
-     */
-    public void perNodeBufferSize(int bufSize);
-
-    /**
-     * Gets maximum number of parallel load operations for a single node.
-     *
-     * @return Maximum number of parallel load operations for a single node.
-     */
-    public int perNodeParallelLoadOperations();
-
-    /**
-     * Sets maximum number of parallel load operations for a single node.
-     * <p>
-     * This method should be called prior to {@link #addData(Object, Object)} call.
-     * <p>
-     * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
-     *
-     * @param parallelOps Maximum number of parallel load operations for a single node.
-     */
-    public void perNodeParallelLoadOperations(int parallelOps);
-
-    /**
-     * Gets automatic flush frequency. Essentially, this is the time after which the
-     * loader will make an attempt to submit all data added so far to remote nodes.
-     * Note that there is no guarantee that data will be delivered after this concrete
-     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
-     * <p>
-     * If set to {@code 0}, automatic flush is disabled.
-     * <p>
-     * Automatic flush is disabled by default (default value is {@code 0}).
-     *
-     * @return Flush frequency or {@code 0} if automatic flush is disabled.
-     * @see #flush()
-     */
-    public long autoFlushFrequency();
-
-    /**
-     * Sets automatic flush frequency. Essentially, this is the time after which the
-     * loader will make an attempt to submit all data added so far to remote nodes.
-     * Note that there is no guarantee that data will be delivered after this concrete
-     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
-     * <p>
-     * If set to {@code 0}, automatic flush is disabled.
-     * <p>
-     * Automatic flush is disabled by default (default value is {@code 0}).
-     *
-     * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
-     * @see #flush()
-     */
-    public void autoFlushFrequency(long autoFlushFreq);
-
-    /**
-     * Gets future for this loading process. This future completes whenever method
-     * {@link #close(boolean)} completes. By attaching listeners to this future
-     * it is possible to get asynchronous notifications for completion of this
-     * loading process.
-     *
-     * @return Future for this loading process.
-     */
-    public IgniteFuture<?> future();
-
-    /**
-     * Optional deploy class for peer deployment. All classes loaded by a data loader
-     * must be class-loadable from the same class-loader. Ignite will make the best
-     * effort to detect the most suitable class-loader for data loading. However,
-     * in complex cases, where compound or deeply nested class-loaders are used,
-     * it is best to specify a deploy class which can be any class loaded by
-     * the class-loader for given data.
-     *
-     * @param depCls Any class loaded by the class-loader for given data.
-     */
-    public void deployClass(Class<?> depCls);
-
-    /**
-     * Sets custom cache updater to this data loader.
-     *
-     * @param updater Cache updater.
-     */
-    public void updater(Updater<K, V> updater);
-
-    /**
-     * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
-     *
-     * @param key Key.
-     * @return Future fo this operation.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     */
-    public IgniteFuture<?> removeData(K key)  throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param key Key.
-     * @param val Value or {@code null} if respective entry must be removed from cache.
-     * @return Future fo this operation.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     */
-    public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
-        IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param entry Entry.
-     * @return Future fo this operation.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     */
-    public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
-        IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param entries Collection of entries to be loaded.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @return Future for this load operation.
-     */
-    public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
-
-    /**
-     * Adds data for loading on remote node. This method can be called from multiple
-     * threads in parallel to speed up loading if needed.
-     * <p>
-     * Note that loader will load data concurrently by multiple internal threads, so the
-     * data may get to remote nodes in different order from which it was added to
-     * the loader.
-     *
-     * @param entries Map to be loaded.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @return Future for this load operation.
-     */
-    public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
-
-    /**
-     * Loads any remaining data, but doesn't close the loader. Data can be still added after
-     * flush is finished. This method blocks and doesn't allow to add any data until all data
-     * is loaded.
-     * <p>
-     * If another thread is already performing flush, this method will block, wait for
-     * another thread to complete flush and exit. If you don't want to wait in this case,
-     * use {@link #tryFlush()} method.
-     *
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @see #tryFlush()
-     */
-    public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
-    /**
-     * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
-     * with the difference that it won't wait and will exit immediately.
-     *
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     * @throws IllegalStateException If grid has been concurrently stopped or
-     *      {@link #close(boolean)} has already been called on loader.
-     * @see #flush()
-     */
-    public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
-    /**
-     * Loads any remaining data and closes this loader.
-     *
-     * @param cancel {@code True} to cancel ongoing loading operations.
-     * @throws IgniteException If failed to map key to node.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     */
-    public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
-
-    /**
-     * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
-     * <p>
-     * The method is invoked automatically on objects managed by the
-     * {@code try-with-resources} statement.
-     *
-     * @throws IgniteException If failed to close data loader.
-     * @throws IgniteInterruptedException If thread has been interrupted.
-     */
-    @Override public void close() throws IgniteException, IgniteInterruptedException;
-
-    /**
-     * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)}
-     * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
-     * performance custom user-defined implementation may help.
-     * <p>
-     * Data loader can be configured to use custom implementation of updater instead of default one using
-     * {@link IgniteDataLoader#updater(IgniteDataLoader.Updater)} method.
-     */
-    interface Updater<K, V> extends Serializable {
-        /**
-         * Updates cache with batch of entries.
-         *
-         * @param cache Cache.
-         * @param entries Collection of entries.
-         * @throws IgniteException If failed.
-         */
-        public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
new file mode 100644
index 0000000..c48d61a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Data loader is responsible for loading external data into cache. It achieves it by
+ * properly buffering updates and properly mapping keys to nodes responsible for the data
+ * to make sure that there is the least amount of data movement possible and optimal
+ * network and memory utilization.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ * <p>
+ * Also note that {@code GridDataLoader} is not the only way to load data into cache.
+ * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
+ * method to load data from underlying data store. You can also use standard
+ * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
+ * likely will not perform as well as this class for loading data. And finally,
+ * data can be loaded from underlying data store on demand, whenever it is accessed -
+ * for this no explicit data loading step is needed.
+ * <p>
+ * {@code IgniteDataLoader} supports the following configuration properties:
+ * <ul>
+ *  <li>
+ *      {@link #perNodeBufferSize(int)} - when entries are added to data loader via
+ *      {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
+ *      away and are buffered internally for better performance and network utilization.
+ *      This setting controls the size of internal per-node buffer before buffered data
+ *      is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
+ *      value.
+ *  </li>
+ *  <li>
+ *      {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
+ *      to the data loader via {@link #addData(Object, Object)} method faster than it can
+ *      be put in cache. In this case, new buffered load messages are sent to remote nodes
+ *      before responses from previous ones are received. This could cause unlimited heap
+ *      memory utilization growth on local and remote nodes. To control memory utilization,
+ *      this setting limits maximum allowed number of parallel buffered load messages that
+ *      are being processed on remote nodes. If this number is exceeded, then
+ *      {@link #addData(Object, Object)} method will block to control memory utilization.
+ *      Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ *  </li>
+ *  <li>
+ *      {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
+ *      this is the time after which the loader will make an attempt to submit all data
+ *      added so far to remote nodes. Note that there is no guarantee that data will be
+ *      delivered after this concrete attempt (e.g., it can fail when topology is
+ *      changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
+ *  </li>
+ *  <li>
+ *      {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
+ *      updates and allow data loader choose most optimal concurrent implementation.
+ *  </li>
+ *  <li>
+ *      {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with loaded entries.
+ *      It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
+ *  </li>
+ *  <li>
+ *      {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
+ *      loaded by a data loader must be class-loadable from the same class-loader.
+ *      Ignite will make the best effort to detect the most suitable class-loader
+ *      for data loading. However, in complex cases, where compound or deeply nested
+ *      class-loaders are used, it is best to specify a deploy class which can be any
+ *      class loaded by the class-loader for given data.
+ *  </li>
+ * </ul>
+ */
+public interface IgniteDataStreamer<K, V> extends AutoCloseable {
+    /** Default max concurrent put operations count. */
+    public static final int DFLT_MAX_PARALLEL_OPS = 16;
+
+    /** Default per node buffer size. */
+    public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
+
+    /**
+     * Name of cache to load data to.
+     *
+     * @return Cache name or {@code null} for default cache.
+     */
+    public String cacheName();
+
+    /**
+     * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
+     * Default is {@code true}.
+     *
+     * @return Flag value.
+     */
+    public boolean allowOverwrite();
+
+    /**
+     * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
+     * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method.
+     * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
+     *
+     * @param allowOverwrite Flag value.
+     * @throws IgniteException If failed.
+     */
+    public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
+
+    /**
+     * Gets flag indicating that write-through behavior should be disabled for data loading.
+     * Default is {@code false}.
+     *
+     * @return Skip store flag.
+     */
+    public boolean skipStore();
+
+    /**
+     * Sets flag indicating that write-through behavior should be disabled for data loading.
+     * Default is {@code false}.
+     *
+     * @param skipStore Skip store flag.
+     */
+    public void skipStore(boolean skipStore);
+
+    /**
+     * Gets size of per node key-value pairs buffer.
+     *
+     * @return Per node buffer size.
+     */
+    public int perNodeBufferSize();
+
+    /**
+     * Sets size of per node key-value pairs buffer.
+     * <p>
+     * This method should be called prior to {@link #addData(Object, Object)} call.
+     * <p>
+     * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
+     *
+     * @param bufSize Per node buffer size.
+     */
+    public void perNodeBufferSize(int bufSize);
+
+    /**
+     * Gets maximum number of parallel load operations for a single node.
+     *
+     * @return Maximum number of parallel load operations for a single node.
+     */
+    public int perNodeParallelLoadOperations();
+
+    /**
+     * Sets maximum number of parallel load operations for a single node.
+     * <p>
+     * This method should be called prior to {@link #addData(Object, Object)} call.
+     * <p>
+     * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+     *
+     * @param parallelOps Maximum number of parallel load operations for a single node.
+     */
+    public void perNodeParallelLoadOperations(int parallelOps);
+
+    /**
+     * Gets automatic flush frequency. Essentially, this is the time after which the
+     * loader will make an attempt to submit all data added so far to remote nodes.
+     * Note that there is no guarantee that data will be delivered after this concrete
+     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+     * <p>
+     * If set to {@code 0}, automatic flush is disabled.
+     * <p>
+     * Automatic flush is disabled by default (default value is {@code 0}).
+     *
+     * @return Flush frequency or {@code 0} if automatic flush is disabled.
+     * @see #flush()
+     */
+    public long autoFlushFrequency();
+
+    /**
+     * Sets automatic flush frequency. Essentially, this is the time after which the
+     * loader will make an attempt to submit all data added so far to remote nodes.
+     * Note that there is no guarantee that data will be delivered after this concrete
+     * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+     * <p>
+     * If set to {@code 0}, automatic flush is disabled.
+     * <p>
+     * Automatic flush is disabled by default (default value is {@code 0}).
+     *
+     * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
+     * @see #flush()
+     */
+    public void autoFlushFrequency(long autoFlushFreq);
+
+    /**
+     * Gets future for this loading process. This future completes whenever method
+     * {@link #close(boolean)} completes. By attaching listeners to this future
+     * it is possible to get asynchronous notifications for completion of this
+     * loading process.
+     *
+     * @return Future for this loading process.
+     */
+    public IgniteFuture<?> future();
+
+    /**
+     * Optional deploy class for peer deployment. All classes loaded by a data loader
+     * must be class-loadable from the same class-loader. Ignite will make the best
+     * effort to detect the most suitable class-loader for data loading. However,
+     * in complex cases, where compound or deeply nested class-loaders are used,
+     * it is best to specify a deploy class which can be any class loaded by
+     * the class-loader for given data.
+     *
+     * @param depCls Any class loaded by the class-loader for given data.
+     */
+    public void deployClass(Class<?> depCls);
+
+    /**
+     * Sets custom cache updater to this data loader.
+     *
+     * @param updater Cache updater.
+     */
+    public void updater(Updater<K, V> updater);
+
+    /**
+     * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
+     *
+     * @param key Key.
+     * @return Future fo this operation.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     */
+    public IgniteFuture<?> removeData(K key)  throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param key Key.
+     * @param val Value or {@code null} if respective entry must be removed from cache.
+     * @return Future fo this operation.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     */
+    public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
+        IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param entry Entry.
+     * @return Future fo this operation.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     */
+    public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
+        IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param entries Collection of entries to be loaded.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @return Future for this load operation.
+     */
+    public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
+
+    /**
+     * Adds data for loading on remote node. This method can be called from multiple
+     * threads in parallel to speed up loading if needed.
+     * <p>
+     * Note that loader will load data concurrently by multiple internal threads, so the
+     * data may get to remote nodes in different order from which it was added to
+     * the loader.
+     *
+     * @param entries Map to be loaded.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @return Future for this load operation.
+     */
+    public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
+
+    /**
+     * Loads any remaining data, but doesn't close the loader. Data can be still added after
+     * flush is finished. This method blocks and doesn't allow to add any data until all data
+     * is loaded.
+     * <p>
+     * If another thread is already performing flush, this method will block, wait for
+     * another thread to complete flush and exit. If you don't want to wait in this case,
+     * use {@link #tryFlush()} method.
+     *
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @see #tryFlush()
+     */
+    public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+    /**
+     * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
+     * with the difference that it won't wait and will exit immediately.
+     *
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     * @throws IllegalStateException If grid has been concurrently stopped or
+     *      {@link #close(boolean)} has already been called on loader.
+     * @see #flush()
+     */
+    public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+    /**
+     * Loads any remaining data and closes this loader.
+     *
+     * @param cancel {@code True} to cancel ongoing loading operations.
+     * @throws IgniteException If failed to map key to node.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     */
+    public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
+
+    /**
+     * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
+     * <p>
+     * The method is invoked automatically on objects managed by the
+     * {@code try-with-resources} statement.
+     *
+     * @throws IgniteException If failed to close data loader.
+     * @throws IgniteInterruptedException If thread has been interrupted.
+     */
+    @Override public void close() throws IgniteException, IgniteInterruptedException;
+
+    /**
+     * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)}
+     * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
+     * performance custom user-defined implementation may help.
+     * <p>
+     * Data loader can be configured to use custom implementation of updater instead of default one using
+     * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method.
+     */
+    interface Updater<K, V> extends Serializable {
+        /**
+         * Updates cache with batch of entries.
+         *
+         * @param cache Cache.
+         * @param entries Collection of entries.
+         * @throws IgniteException If failed.
+         */
+        public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f46d071..336f872 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2346,7 +2346,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+    @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
         guard();
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 12ea535..6ed5699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3877,7 +3877,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+            IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
 
             try {
                 ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -4043,7 +4043,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
      * @throws IgniteCheckedException If failed.
      */
     private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
-        try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
+        try (final IgniteDataStreamer<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
             ldr.allowOverwrite(true);
             ldr.skipStore(true);
 
@@ -4086,7 +4086,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
 
         if (ctx.store().isLocalStore()) {
-            IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+            IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
 
             try {
                 ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -6134,7 +6134,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
         final Collection<Map.Entry<K, V>> col;
 
         /** */
-        final IgniteDataLoaderImpl<K, V> ldr;
+        final IgniteDataStreamerImpl<K, V> ldr;
 
         /** */
         final ExpiryPolicy plc;
@@ -6145,7 +6145,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
          * @param plc Optional expiry policy.
          */
         private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
-            IgniteDataLoaderImpl<K, V> ldr,
+            IgniteDataStreamerImpl<K, V> ldr,
             @Nullable ExpiryPolicy plc) {
             this.p = p;
             this.ldr = ldr;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 00190d9..c99efc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -276,8 +276,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 else
                     dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
 
-                try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) {
-                    ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0);
+                try (IgniteDataStreamer<K, V> dataLdr = ignite.dataLoader(cacheName)) {
+                    ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
 
                     dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
index e2e780b..78a7e62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
@@ -29,13 +29,13 @@ import java.util.*;
  */
 public class GridDataLoadCacheUpdaters {
     /** */
-    private static final IgniteDataLoader.Updater INDIVIDUAL = new Individual();
+    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
 
     /** */
-    private static final IgniteDataLoader.Updater BATCHED = new Batched();
+    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
 
     /** */
-    private static final IgniteDataLoader.Updater BATCHED_SORTED = new BatchedSorted();
+    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
 
     /**
      * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
@@ -44,7 +44,7 @@ public class GridDataLoadCacheUpdaters {
      *
      * @return Single updater.
      */
-    public static <K, V> IgniteDataLoader.Updater<K, V> individual() {
+    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
         return INDIVIDUAL;
     }
 
@@ -55,7 +55,7 @@ public class GridDataLoadCacheUpdaters {
      *
      * @return Batched updater.
      */
-    public static <K, V> IgniteDataLoader.Updater<K, V> batched() {
+    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
         return BATCHED;
     }
 
@@ -66,7 +66,7 @@ public class GridDataLoadCacheUpdaters {
      *
      * @return Batched sorted updater.
      */
-    public static <K extends Comparable<?>, V> IgniteDataLoader.Updater<K, V> batchedSorted() {
+    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
         return BATCHED_SORTED;
     }
 
@@ -93,7 +93,7 @@ public class GridDataLoadCacheUpdaters {
     /**
      * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
      */
-    private static class Individual<K, V> implements IgniteDataLoader.Updater<K, V> {
+    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -120,7 +120,7 @@ public class GridDataLoadCacheUpdaters {
     /**
      * Batched updater. Updates cache using batch operations thus is dead lock prone.
      */
-    private static class Batched<K, V> implements IgniteDataLoader.Updater<K, V> {
+    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -160,7 +160,7 @@ public class GridDataLoadCacheUpdaters {
     /**
      * Batched updater. Updates cache using batch operations thus is dead lock prone.
      */
-    private static class BatchedSorted<K, V> implements IgniteDataLoader.Updater<K, V> {
+    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
         /** */
         private static final long serialVersionUID = 0L;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
index 9e2a483..8aa554a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
@@ -48,7 +48,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
     private final boolean skipStore;
 
     /** */
-    private final IgniteDataLoader.Updater<K, V> updater;
+    private final IgniteDataStreamer.Updater<K, V> updater;
 
     /**
      * @param ctx Context.
@@ -65,7 +65,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
         Collection<Map.Entry<K, V>> col,
         boolean ignoreDepOwnership,
         boolean skipStore,
-        IgniteDataLoader.Updater<K, V> updater) {
+        IgniteDataStreamer.Updater<K, V> updater) {
         this.ctx = ctx;
         this.log = log;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
index 5efcfe9..dffa862 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
@@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
 
     /** Data loader. */
     @GridToStringExclude
-    private IgniteDataLoaderImpl dataLdr;
+    private IgniteDataStreamerImpl dataLdr;
 
     /**
      * Default constructor for {@link Externalizable} support.
@@ -47,7 +47,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
      * @param ctx Context.
      * @param dataLdr Data loader.
      */
-    GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
+    GridDataLoaderFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
         super(ctx);
 
         assert dataLdr != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
index d470d02..b29c9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
  */
 public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
     /** Loaders map (access is not supposed to be highly concurrent). */
-    private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>();
+    private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -50,7 +50,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
     private Thread flusher;
 
     /** */
-    private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>();
+    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
 
     /** Marshaller. */
     private final Marshaller marsh;
@@ -80,7 +80,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
         flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
             @Override protected void body() throws InterruptedException {
                 while (!isCancelled()) {
-                    IgniteDataLoaderImpl<K, V> ldr = flushQ.take();
+                    IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
 
                     if (!busyLock.enterBusy())
                         return;
@@ -118,7 +118,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
         U.interrupt(flusher);
         U.join(flusher, log);
 
-        for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) {
+        for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
             if (log.isDebugEnabled())
                 log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
 
@@ -142,12 +142,12 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
      * @param compact {@code true} if data loader should transfer data in compact format.
      * @return Data loader.
      */
-    public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
+    public IgniteDataStreamerImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to create data loader (grid is stopping).");
 
         try {
-            final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact);
+            final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
 
             ldrs.add(ldr);
 
@@ -173,7 +173,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
      * @param cacheName Cache name ({@code null} for default cache).
      * @return Data loader.
      */
-    public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+    public IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
         return dataLoader(cacheName, true);
     }
 
@@ -234,7 +234,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
             }
 
             Collection<Map.Entry<K, V>> col;
-            IgniteDataLoader.Updater<K, V> updater;
+            IgniteDataStreamer.Updater<K, V> updater;
 
             try {
                 col = marsh.unmarshal(req.collectionBytes(), clsLdr);