You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/04 14:44:05 UTC

[1/5] incubator-ignite git commit: # gg-9869

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-394 e5f686239 -> 9c8217c17


# gg-9869


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/96f426bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/96f426bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/96f426bb

Branch: refs/heads/ignite-394
Commit: 96f426bb0b5b3d19badac0610726f58ca4a0c15e
Parents: e5f6862
Author: Artem Shutak <as...@gridgain.com>
Authored: Wed Mar 4 15:24:33 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Wed Mar 4 15:24:33 2015 +0300

----------------------------------------------------------------------
 .../ignite/examples/CacheExamplesSelfTest.java  |   2 +-
 .../org/apache/ignite/IgniteDataStreamer.java   |   2 +-
 .../GridDistributedCacheAdapter.java            |   2 +-
 .../dataload/GridDataLoadCacheUpdaters.java     | 199 -------------------
 .../dataload/GridDataLoadUpdateJob.java         | 119 -----------
 .../IgniteDataStreamerCacheUpdaters.java        | 199 +++++++++++++++++++
 .../dataload/IgniteDataStreamerImpl.java        |  10 +-
 .../dataload/IgniteDataStreamerProcessor.java   |   2 +-
 .../dataload/IgniteDataStreamerUpdateJob.java   | 119 +++++++++++
 .../processors/igfs/IgfsDataManager.java        |   2 +-
 .../IgniteDataStreamerImplSelfTest.java         |   4 +-
 .../IgniteDataStreamerPerformanceTest.java      |   2 +-
 .../IgniteDataStreamerProcessorSelfTest.java    |  22 +-
 13 files changed, 342 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index c5c4599..14af44f 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -114,7 +114,7 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
     /**
      * @throws Exception If failed.
      */
-    public void testCacheDataLoaderExample() throws Exception {
+    public void testCacheDataStreamerExample() throws Exception {
         CacheDataStreamerExample.main(EMPTY_ARGS);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 22aa0c1..a47c079 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -33,7 +33,7 @@ import java.util.*;
  * data may get to remote nodes in different order from which it was added to
  * the loader.
  * <p>
- * Also note that {@code GridDataLoader} is not the only way to load data into cache.
+ * Also note that {@code IgniteDataStreamer} is not the only way to load data into cache.
  * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
  * method to load data from underlying data store. You can also use standard
  * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5791b8d..16419f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -279,7 +279,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                 try (IgniteDataStreamer<K, V> dataLdr = ignite.dataStreamer(cacheName)) {
                     ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
 
-                    dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());
+                    dataLdr.updater(IgniteDataStreamerCacheUpdaters.<K, V>batched());
 
                     for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) {
                         if (!locPart.isEmpty() && locPart.primary(topVer)) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
deleted file mode 100644
index 78a7e62..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Bundled factory for cache updaters.
- */
-public class GridDataLoadCacheUpdaters {
-    /** */
-    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
-
-    /**
-     * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
-     * is not the best.
-     *
-     * @return Single updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
-        return INDIVIDUAL;
-    }
-
-    /**
-     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
-     * updated concurrently. Performance is generally better than in {@link #individual()}.
-     *
-     * @return Batched updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
-        return BATCHED;
-    }
-
-    /**
-     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
-     * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
-     *
-     * @return Batched sorted updater.
-     */
-    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
-        return BATCHED_SORTED;
-    }
-
-    /**
-     * Updates cache.
-     *
-     * @param cache Cache.
-     * @param rmvCol Keys to remove.
-     * @param putMap Entries to put.
-     * @throws IgniteException If failed.
-     */
-    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
-        Map<K, V> putMap) {
-        assert rmvCol != null || putMap != null;
-
-        // Here we assume that there are no key duplicates, so the following calls are valid.
-        if (rmvCol != null)
-            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
-
-        if (putMap != null)
-            cache.putAll(putMap);
-    }
-
-    /**
-     * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
-     */
-    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null)
-                    cache.remove(key);
-                else
-                    cache.put(key, val);
-            }
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock prone.
-     */
-    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new HashSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new HashMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock prone.
-     */
-    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key instanceof Comparable;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new TreeSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new TreeMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
deleted file mode 100644
index 8aa554a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
-    /** */
-    private final GridKernalContext ctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** Cache name. */
-    private final String cacheName;
-
-    /** Entries to put. */
-    private final Collection<Map.Entry<K, V>> col;
-
-    /** {@code True} to ignore deployment ownership. */
-    private final boolean ignoreDepOwnership;
-
-    /** */
-    private final boolean skipStore;
-
-    /** */
-    private final IgniteDataStreamer.Updater<K, V> updater;
-
-    /**
-     * @param ctx Context.
-     * @param log Log.
-     * @param cacheName Cache name.
-     * @param col Entries to put.
-     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
-     * @param updater Updater.
-     */
-    GridDataLoadUpdateJob(
-        GridKernalContext ctx,
-        IgniteLogger log,
-        @Nullable String cacheName,
-        Collection<Map.Entry<K, V>> col,
-        boolean ignoreDepOwnership,
-        boolean skipStore,
-        IgniteDataStreamer.Updater<K, V> updater) {
-        this.ctx = ctx;
-        this.log = log;
-
-        assert col != null && !col.isEmpty();
-        assert updater != null;
-
-        this.cacheName = cacheName;
-        this.col = col;
-        this.ignoreDepOwnership = ignoreDepOwnership;
-        this.skipStore = skipStore;
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object call() throws Exception {
-        if (log.isDebugEnabled())
-            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
-
-//        TODO IGNITE-77: restore adapter usage.
-//        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-//
-//        IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-//        if (!f.isDone())
-//            f.get();
-//
-//        if (ignoreDepOwnership)
-//            cache.context().deploy().ignoreOwnership(true);
-
-        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
-        if (skipStore)
-            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
-        if (ignoreDepOwnership)
-            cache.context().deploy().ignoreOwnership(true);
-
-        try {
-            updater.update(cache, col);
-
-            return null;
-        }
-        finally {
-            if (ignoreDepOwnership)
-                cache.context().deploy().ignoreOwnership(false);
-
-            if (log.isDebugEnabled())
-                log.debug("Update job finished on node: " + ctx.localNodeId());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..1742041
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class IgniteDataStreamerCacheUpdaters {
+    /** */
+    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
+
+    /**
+     * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
+     * is not the best.
+     *
+     * @return Single updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+        return INDIVIDUAL;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
+     * updated concurrently. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+        return BATCHED;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
+     * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched sorted updater.
+     */
+    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
+        return BATCHED_SORTED;
+    }
+
+    /**
+     * Updates cache.
+     *
+     * @param cache Cache.
+     * @param rmvCol Keys to remove.
+     * @param putMap Entries to put.
+     * @throws IgniteException If failed.
+     */
+    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
+        Map<K, V> putMap) {
+        assert rmvCol != null || putMap != null;
+
+        // Here we assume that there are no key duplicates, so the following calls are valid.
+        if (rmvCol != null)
+            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+        if (putMap != null)
+            cache.putAll(putMap);
+    }
+
+    /**
+     * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
+     */
+    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null)
+                    cache.remove(key);
+                else
+                    cache.put(key, val);
+            }
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new HashSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new HashMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key instanceof Comparable;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new TreeSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new TreeMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
index 80d08c8..1231e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
@@ -302,7 +302,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
         if (node == null)
             throw new IgniteException("Failed to get node for cache: " + cacheName);
 
-        updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+        updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
     }
 
     /** {@inheritDoc} */
@@ -450,7 +450,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
                 assert key != null;
 
                 if (initPda) {
-                    jobPda = new DataLoaderPda(key, entry.getValue(), updater);
+                    jobPda = new DataStreamerPda(key, entry.getValue(), updater);
 
                     initPda = false;
                 }
@@ -981,7 +981,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
 
             if (isLocNode) {
                 fut = ctx.closure().callLocalSafe(
-                    new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
+                    new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
 
                 locFuts.add(fut);
 
@@ -1193,7 +1193,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
     /**
      * Data streamer peer-deploy aware.
      */
-    private class DataLoaderPda implements GridPeerDeployAware {
+    private class DataStreamerPda implements GridPeerDeployAware {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -1211,7 +1211,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
          *
          * @param objs Collection of objects to detect deploy class and class loader.
          */
-        private DataLoaderPda(Object... objs) {
+        private DataStreamerPda(Object... objs) {
             this.objs = Arrays.asList(objs);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
index 69ea440..7db41e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
@@ -248,7 +248,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
                 return;
             }
 
-            GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx,
+            IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
                 log,
                 req.cacheName(),
                 col,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
new file mode 100644
index 0000000..1a3db40
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Entries to put. */
+    private final Collection<Map.Entry<K, V>> col;
+
+    /** {@code True} to ignore deployment ownership. */
+    private final boolean ignoreDepOwnership;
+
+    /** */
+    private final boolean skipStore;
+
+    /** */
+    private final IgniteDataStreamer.Updater<K, V> updater;
+
+    /**
+     * @param ctx Context.
+     * @param log Log.
+     * @param cacheName Cache name.
+     * @param col Entries to put.
+     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+     * @param updater Updater.
+     */
+    IgniteDataStreamerUpdateJob(
+        GridKernalContext ctx,
+        IgniteLogger log,
+        @Nullable String cacheName,
+        Collection<Map.Entry<K, V>> col,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        IgniteDataStreamer.Updater<K, V> updater) {
+        this.ctx = ctx;
+        this.log = log;
+
+        assert col != null && !col.isEmpty();
+        assert updater != null;
+
+        this.cacheName = cacheName;
+        this.col = col;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object call() throws Exception {
+        if (log.isDebugEnabled())
+            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
+
+//        TODO IGNITE-77: restore adapter usage.
+//        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+//
+//        IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+//        if (!f.isDone())
+//            f.get();
+//
+//        if (ignoreDepOwnership)
+//            cache.context().deploy().ignoreOwnership(true);
+
+        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
+
+        if (skipStore)
+            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
+
+        if (ignoreDepOwnership)
+            cache.context().deploy().ignoreOwnership(true);
+
+        try {
+            updater.update(cache, col);
+
+            return null;
+        }
+        finally {
+            if (ignoreDepOwnership)
+                cache.context().deploy().ignoreOwnership(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Update job finished on node: " + ctx.localNodeId());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d585352..15309bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -314,7 +314,7 @@ public class IgfsDataManager extends IgfsManager {
         if (cfg.getPerNodeParallelBatchCount() > 0)
             ldr.perNodeParallelLoadOperations(cfg.getPerNodeParallelBatchCount());
 
-        ldr.updater(GridDataLoadCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted());
+        ldr.updater(IgniteDataStreamerCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted());
 
         return ldr;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
index b0d8625..306e615 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
@@ -36,7 +36,7 @@ import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 
 /**
- * Tests for {@code GridDataLoaderImpl}.
+ * Tests for {@code IgniteDataStreamerImpl}.
  */
 public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
     /** IP finder. */
@@ -69,7 +69,7 @@ public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testNullPointerExceptionUponDataLoaderClosing() throws Exception {
+    public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
         try {
             startGrids(5);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
index b3dd71b..22a1f97 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
@@ -140,7 +140,7 @@ public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
             final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
 
             ldr.perNodeBufferSize(8192);
-            ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted());
             ldr.autoFlushFrequency(0);
 
             final LongAdder cnt = new LongAdder();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
index 8eefebf..23a46e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
@@ -124,7 +124,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
     public void testPartitioned() throws Exception {
         mode = PARTITIONED;
 
-        checkDataLoader();
+        checkDataStreamer();
     }
 
     /**
@@ -134,7 +134,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
         mode = PARTITIONED;
         nearEnabled = false;
 
-        checkDataLoader();
+        checkDataStreamer();
     }
 
     /**
@@ -143,7 +143,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
     public void testReplicated() throws Exception {
         mode = REPLICATED;
 
-        checkDataLoader();
+        checkDataStreamer();
     }
 
     /**
@@ -153,7 +153,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
         mode = LOCAL;
 
         try {
-            checkDataLoader();
+            checkDataStreamer();
 
             assert false;
         }
@@ -167,7 +167,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
      * @throws Exception If failed.
      */
     @SuppressWarnings("ErrorNotRethrown")
-    private void checkDataLoader() throws Exception {
+    private void checkDataStreamer() throws Exception {
         try {
             Ignite g1 = startGrid(1);
 
@@ -178,7 +178,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
 
             final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
 
-            ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
 
             final AtomicInteger idxGen = new AtomicInteger();
             final int cnt = 400;
@@ -220,7 +220,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
 
             final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
 
-            rmvLdr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
+            rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
 
             final CountDownLatch l2 = new CountDownLatch(threads);
 
@@ -265,7 +265,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
     public void testPartitionedIsolated() throws Exception {
         mode = PARTITIONED;
 
-        checkIsolatedDataLoader();
+        checkIsolatedDataStreamer();
     }
 
     /**
@@ -274,13 +274,13 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
     public void testReplicatedIsolated() throws Exception {
         mode = REPLICATED;
 
-        checkIsolatedDataLoader();
+        checkIsolatedDataStreamer();
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void checkIsolatedDataLoader() throws Exception {
+    private void checkIsolatedDataStreamer() throws Exception {
         try {
             useCache = true;
 
@@ -418,7 +418,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
             // Get and configure loader.
             final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
 
-            ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>individual());
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
             ldr.perNodeBufferSize(2);
 
             // Define count of puts.


[2/5] incubator-ignite git commit: # gg-9869

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
deleted file mode 100644
index 23a46e5..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
+++ /dev/null
@@ -1,924 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- *
- */
-public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest {
-    /** */
-    private static ConcurrentHashMap<Object, Object> storeMap;
-
-    /** */
-    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private CacheMode mode = PARTITIONED;
-
-    /** */
-    private boolean nearEnabled = true;
-
-    /** */
-    private boolean useCache;
-
-    /** */
-    private TestStore store;
-
-    /** {@inheritDoc} */
-    @Override public void afterTest() throws Exception {
-        super.afterTest();
-
-        useCache = false;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(ipFinder);
-
-        cfg.setDiscoverySpi(spi);
-
-        cfg.setIncludeProperties();
-
-        cfg.setMarshaller(new OptimizedMarshaller(false));
-
-        if (useCache) {
-            CacheConfiguration cc = defaultCacheConfiguration();
-
-            cc.setCacheMode(mode);
-            cc.setAtomicityMode(TRANSACTIONAL);
-            cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-
-            cc.setEvictSynchronized(false);
-            cc.setEvictNearSynchronized(false);
-
-            if (store != null) {
-                cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
-                cc.setReadThrough(true);
-                cc.setWriteThrough(true);
-            }
-
-            cfg.setCacheConfiguration(cc);
-        }
-        else
-            cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitioned() throws Exception {
-        mode = PARTITIONED;
-
-        checkDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testColocated() throws Exception {
-        mode = PARTITIONED;
-        nearEnabled = false;
-
-        checkDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicated() throws Exception {
-        mode = REPLICATED;
-
-        checkDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLocal() throws Exception {
-        mode = LOCAL;
-
-        try {
-            checkDataStreamer();
-
-            assert false;
-        }
-        catch (IgniteCheckedException e) {
-            // Cannot load local cache configured remotely.
-            info("Caught expected exception: " + e);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    @SuppressWarnings("ErrorNotRethrown")
-    private void checkDataStreamer() throws Exception {
-        try {
-            Ignite g1 = startGrid(1);
-
-            useCache = true;
-
-            Ignite g2 = startGrid(2);
-            startGrid(3);
-
-            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
-            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
-            final AtomicInteger idxGen = new AtomicInteger();
-            final int cnt = 400;
-            final int threads = 10;
-
-            final CountDownLatch l1 = new CountDownLatch(threads);
-
-            IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
-                    for (int i = 0; i < cnt; i++) {
-                        int idx = idxGen.getAndIncrement();
-
-                        futs.add(ldr.addData(idx, idx));
-                    }
-
-                    l1.countDown();
-
-                    for (IgniteFuture<?> fut : futs)
-                        fut.get();
-
-                    return null;
-                }
-            }, threads);
-
-            l1.await();
-
-            // This will wait until data streamer finishes loading.
-            stopGrid(getTestGridName(1), false);
-
-            f1.get();
-
-            int s2 = internalCache(2).primaryKeySet().size();
-            int s3 = internalCache(3).primaryKeySet().size();
-            int total = threads * cnt;
-
-            assertEquals(total, s2 + s3);
-
-            final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
-
-            rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
-            final CountDownLatch l2 = new CountDownLatch(threads);
-
-            IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
-                    for (int i = 0; i < cnt; i++) {
-                        final int key = idxGen.decrementAndGet();
-
-                        futs.add(rmvLdr.removeData(key));
-                    }
-
-                    l2.countDown();
-
-                    for (IgniteFuture<?> fut : futs)
-                        fut.get();
-
-                    return null;
-                }
-            }, threads);
-
-            l2.await();
-
-            rmvLdr.close(false);
-
-            f2.get();
-
-            s2 = internalCache(2).primaryKeySet().size();
-            s3 = internalCache(3).primaryKeySet().size();
-
-            assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedIsolated() throws Exception {
-        mode = PARTITIONED;
-
-        checkIsolatedDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicatedIsolated() throws Exception {
-        mode = REPLICATED;
-
-        checkIsolatedDataStreamer();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void checkIsolatedDataStreamer() throws Exception {
-        try {
-            useCache = true;
-
-            Ignite g1 = startGrid(0);
-            startGrid(1);
-            startGrid(2);
-
-            awaitPartitionMapExchange();
-
-            GridCache<Integer, Integer> cache = ((IgniteKernal)grid(0)).cache(null);
-
-            for (int i = 0; i < 100; i++)
-                cache.put(i, -1);
-
-            final int cnt = 40_000;
-            final int threads = 10;
-
-            try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) {
-                final AtomicInteger idxGen = new AtomicInteger();
-
-                IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        for (int i = 0; i < cnt; i++) {
-                            int idx = idxGen.getAndIncrement();
-
-                            ldr.addData(idx, idx);
-                        }
-
-                        return null;
-                    }
-                }, threads);
-
-                f1.get();
-            }
-
-            for (int g = 0; g < 3; g++) {
-                ClusterNode locNode = grid(g).localNode();
-
-                GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null);
-
-                if (cache0.isNear())
-                    cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht();
-
-                CacheAffinity<Integer> aff = cache0.affinity();
-
-                for (int key = 0; key < cnt * threads; key++) {
-                    if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) {
-                        GridCacheEntryEx<Integer, Integer> entry = cache0.peekEx(key);
-
-                        assertNotNull("Missing entry for key: " + key, entry);
-                        assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false));
-                    }
-                }
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Test primitive arrays can be passed into data streamer.
-     *
-     * @throws Exception If failed.
-     */
-    public void testPrimitiveArrays() throws Exception {
-        try {
-            useCache = true;
-            mode = PARTITIONED;
-
-            Ignite g1 = startGrid(1);
-            startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
-
-            List<Object> arrays = Arrays.<Object>asList(
-                new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
-                new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
-
-            IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
-
-            for (int i = 0, size = arrays.size(); i < 1000; i++) {
-                Object arr = arrays.get(i % size);
-
-                dataLdr.addData(i, arr);
-                dataLdr.addData(i, fixedClosure(arr));
-            }
-
-            dataLdr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testReplicatedMultiThreaded() throws Exception {
-        mode = REPLICATED;
-
-        checkLoaderMultithreaded(1, 2);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPartitionedMultiThreaded() throws Exception {
-        mode = PARTITIONED;
-
-        checkLoaderMultithreaded(1, 3);
-    }
-
-    /**
-     * Tests loader in multithreaded environment with various count of grids started.
-     *
-     * @param nodesCntNoCache How many nodes should be started without cache.
-     * @param nodesCntCache How many nodes should be started with cache.
-     * @throws Exception If failed.
-     */
-    protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
-        throws Exception {
-        try {
-            // Start all required nodes.
-            int idx = 1;
-
-            for (int i = 0; i < nodesCntNoCache; i++)
-                startGrid(idx++);
-
-            useCache = true;
-
-            for (int i = 0; i < nodesCntCache; i++)
-                startGrid(idx++);
-
-            Ignite g1 = grid(1);
-
-            // Get and configure loader.
-            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
-            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
-            ldr.perNodeBufferSize(2);
-
-            // Define count of puts.
-            final AtomicInteger idxGen = new AtomicInteger();
-
-            final AtomicBoolean done = new AtomicBoolean();
-
-            try {
-                final int totalPutCnt = 50000;
-
-                IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        Collection<IgniteFuture<?>> futs = new ArrayList<>();
-
-                        while (!done.get()) {
-                            int idx = idxGen.getAndIncrement();
-
-                            if (idx >= totalPutCnt) {
-                                info(">>> Stopping producer thread since maximum count of puts reached.");
-
-                                break;
-                            }
-
-                            futs.add(ldr.addData(idx, idx));
-                        }
-
-                        ldr.flush();
-
-                        for (IgniteFuture<?> fut : futs)
-                            fut.get();
-
-                        return null;
-                    }
-                }, 5, "producer");
-
-                IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        while (!done.get()) {
-                            ldr.flush();
-
-                            U.sleep(100);
-                        }
-
-                        return null;
-                    }
-                }, 1, "flusher");
-
-                // Define index of node being restarted.
-                final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
-
-                IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
-                    @Override public Object call() throws Exception {
-                        try {
-                            for (int i = 0; i < 5; i++) {
-                                Ignite g = startGrid(restartNodeIdx);
-
-                                UUID id = g.cluster().localNode().id();
-
-                                info(">>>>>>> Started node: " + id);
-
-                                U.sleep(1000);
-
-                                stopGrid(getTestGridName(restartNodeIdx), true);
-
-                                info(">>>>>>> Stopped node: " + id);
-                            }
-                        }
-                        finally {
-                            done.set(true);
-
-                            info("Start stop thread finished.");
-                        }
-
-                        return null;
-                    }
-                }, 1, "start-stop-thread");
-
-                fut1.get();
-                fut2.get();
-                fut3.get();
-            }
-            finally {
-                ldr.close(false);
-            }
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testLoaderApi() throws Exception {
-        useCache = true;
-
-        try {
-            Ignite g1 = startGrid(1);
-
-            IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
-
-            ldr.close(false);
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-
-            try {
-                // Create another loader.
-                ldr = g1.dataStreamer("UNKNOWN_CACHE");
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            ldr.close(true);
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-
-            // Create another loader.
-            ldr = g1.dataStreamer(null);
-
-            // Cancel with future.
-            ldr.future().cancel();
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            try {
-                ldr.future().get();
-
-                assert false;
-            }
-            catch (IgniteFutureCancelledException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            // Create another loader.
-            ldr = g1.dataStreamer(null);
-
-            // This will close loader.
-            stopGrid(getTestGridName(1), false);
-
-            try {
-                ldr.addData(0, 0);
-
-                assert false;
-            }
-            catch (IllegalStateException e) {
-                info("Caught expected exception: " + e);
-            }
-
-            assert ldr.future().isDone();
-
-            ldr.future().get();
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Wraps integer to closure returning it.
-     *
-     * @param i Value to wrap.
-     * @return Callable.
-     */
-    private static Callable<Integer> callable(@Nullable final Integer i) {
-        return new Callable<Integer>() {
-            @Override public Integer call() throws Exception {
-                return i;
-            }
-        };
-    }
-
-    /**
-     * Wraps integer to closure returning it.
-     *
-     * @param i Value to wrap.
-     * @return Closure.
-     */
-    private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
-        return new IgniteClosure<Integer, Integer>() {
-            @Override public Integer apply(Integer e) {
-                return e == null ? i : e + i;
-            }
-        };
-    }
-
-    /**
-     * Wraps object to closure returning it.
-     *
-     * @param obj Value to wrap.
-     * @return Closure.
-     */
-    private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
-        return new IgniteClosure<T, T>() {
-            @Override public T apply(T e) {
-                assert e == null || obj == null || e.getClass() == obj.getClass() :
-                    "Expects the same types [e=" + e + ", obj=" + obj + ']';
-
-                return obj;
-            }
-        };
-    }
-
-    /**
-     * Wraps integer to closure expecting it and returning {@code null}.
-     *
-     * @param exp Expected closure value.
-     * @return Remove expected cache value closure.
-     */
-    private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
-        return new IgniteClosure<T, T>() {
-            @Override public T apply(T act) {
-                if (exp == null ? act == null : exp.equals(act))
-                    return null;
-
-                throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
-            }
-        };
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFlush() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            final IgniteCache<Integer, Integer> c = g.jcache(null);
-
-            final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
-            ldr.perNodeBufferSize(10);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.localSize() == 0);
-
-            multithreaded(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    ldr.flush();
-
-                    assertEquals(9, c.size());
-
-                    return null;
-                }
-            }, 5, "flush-checker");
-
-            ldr.addData(100, 100);
-
-            ldr.flush();
-
-            assertEquals(10, c.size());
-
-            ldr.addData(200, 200);
-
-            ldr.close(false);
-
-            ldr.future().get();
-
-            assertEquals(11, c.size());
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testTryFlush() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            IgniteCache<Integer, Integer> c = g.jcache(null);
-
-            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
-            ldr.perNodeBufferSize(10);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.localSize() == 0);
-
-            ldr.tryFlush();
-
-            Thread.sleep(100);
-
-            assertEquals(9, c.size());
-
-            ldr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testFlushTimeout() throws Exception {
-        mode = LOCAL;
-
-        useCache = true;
-
-        try {
-            Ignite g = startGrid();
-
-            final CountDownLatch latch = new CountDownLatch(9);
-
-            g.events().localListen(new IgnitePredicate<Event>() {
-                @Override public boolean apply(Event evt) {
-                    latch.countDown();
-
-                    return true;
-                }
-            }, EVT_CACHE_OBJECT_PUT);
-
-            IgniteCache<Integer, Integer> c = g.jcache(null);
-
-            assertTrue(c.localSize() == 0);
-
-            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
-            ldr.perNodeBufferSize(10);
-            ldr.autoFlushFrequency(3000);
-            ldr.allowOverwrite(true);
-
-            for (int i = 0; i < 9; i++)
-                ldr.addData(i, i);
-
-            assertTrue(c.localSize() == 0);
-
-            assertFalse(latch.await(1000, MILLISECONDS));
-
-            assertTrue(c.localSize() == 0);
-
-            assertTrue(latch.await(3000, MILLISECONDS));
-
-            assertEquals(9, c.size());
-
-            ldr.close(false);
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testUpdateStore() throws Exception {
-        storeMap = new ConcurrentHashMap<>();
-
-        try {
-            store = new TestStore();
-
-            useCache = true;
-
-            Ignite ignite = startGrid(1);
-
-            startGrid(2);
-            startGrid(3);
-
-            for (int i = 0; i < 1000; i++)
-                storeMap.put(i, i);
-
-            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
-                ldr.allowOverwrite(true);
-
-                assertFalse(ldr.skipStore());
-
-                for (int i = 0; i < 1000; i++)
-                    ldr.removeData(i);
-
-                for (int i = 1000; i < 2000; i++)
-                    ldr.addData(i, i);
-            }
-
-            for (int i = 0; i < 1000; i++)
-                assertNull(storeMap.get(i));
-
-            for (int i = 1000; i < 2000; i++)
-                assertEquals(i, storeMap.get(i));
-
-            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
-                ldr.allowOverwrite(true);
-
-                ldr.skipStore(true);
-
-                for (int i = 0; i < 1000; i++)
-                    ldr.addData(i, i);
-
-                for (int i = 1000; i < 2000; i++)
-                    ldr.removeData(i);
-            }
-
-            IgniteCache<Object, Object> cache = ignite.jcache(null);
-
-            for (int i = 0; i < 1000; i++) {
-                assertNull(storeMap.get(i));
-
-                assertEquals(i, cache.get(i));
-            }
-
-            for (int i = 1000; i < 2000; i++) {
-                assertEquals(i, storeMap.get(i));
-
-                assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
-            }
-        }
-        finally {
-            storeMap = null;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestObject {
-        /** Value. */
-        private final int val;
-
-        /**
-         * @param val Value.
-         */
-        private TestObject(int val) {
-            this.val = val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            TestObject obj = (TestObject)o;
-
-            return val == obj.val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val;
-        }
-    }
-
-    /**
-     *
-     */
-    private static class TestStore extends CacheStoreAdapter<Object, Object> {
-        /** {@inheritDoc} */
-        @Nullable @Override public Object load(Object key) {
-            return storeMap.get(key);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void write(Cache.Entry<?, ?> entry) {
-            storeMap.put(entry.getKey(), entry.getValue());
-        }
-
-        /** {@inheritDoc} */
-        @Override public void delete(Object key) {
-            storeMap.remove(key);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
new file mode 100644
index 0000000..63b2c248
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests for {@code IgniteDataStreamerImpl}.
+ */
+public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Number of keys to load via data streamer. */
+    private static final int KEYS_COUNT = 1000;
+
+    /** Started grid counter. */
+    private static int cnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        // Forth node goes without cache.
+        if (cnt < 4)
+            cfg.setCacheConfiguration(cacheConfiguration());
+
+        cnt++;
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
+        try {
+            startGrids(5);
+
+            final CyclicBarrier barrier = new CyclicBarrier(2);
+
+            multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    U.awaitQuiet(barrier);
+
+                    G.stopAll(true);
+
+                    return null;
+                }
+            }, 1);
+
+            Ignite g4 = grid(4);
+
+            IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
+
+            dataLdr.perNodeBufferSize(32);
+
+            for (int i = 0; i < 100000; i += 2) {
+                dataLdr.addData(i, i);
+                dataLdr.removeData(i + 1);
+            }
+
+            U.awaitQuiet(barrier);
+
+            info("Closing data streamer.");
+
+            try {
+                dataLdr.close(true);
+            }
+            catch (IllegalStateException ignore) {
+                // This is ok to ignore this exception as test is racy by it's nature -
+                // grid is stopping in different thread.
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Data streamer should correctly load entries from HashMap in case of grids with more than one node
+     *  and with GridOptimizedMarshaller that requires serializable.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAddDataFromMap() throws Exception {
+        try {
+            cnt = 0;
+
+            startGrids(2);
+
+            Ignite g0 = grid(0);
+
+            Marshaller marsh = g0.configuration().getMarshaller();
+
+            if (marsh instanceof OptimizedMarshaller)
+                assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable());
+            else
+                fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
+
+            IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
+
+            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+
+            for (int i = 0; i < KEYS_COUNT; i ++)
+                map.put(i, String.valueOf(i));
+
+            dataLdr.addData(map);
+
+            dataLdr.close();
+
+            Random rnd = new Random();
+
+            IgniteCache<Integer, String> c = g0.jcache(null);
+
+            for (int i = 0; i < KEYS_COUNT; i ++) {
+                Integer k = rnd.nextInt(KEYS_COUNT);
+
+                String v = c.get(k);
+
+                assertEquals(k.toString(), v);
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Gets cache configuration.
+     *
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(1);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cacheCfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestObject implements Serializable {
+        /** */
+        private int val;
+
+        /**
+         */
+        private TestObject() {
+            // No-op.
+        }
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        public Integer val() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof TestObject && ((TestObject)obj).val == val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
new file mode 100644
index 0000000..33fe310
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Data streamer performance test. Compares group lock data streamer to traditional lock.
+ * <p>
+ * Disable assertions and give at least 2 GB heap to run this test.
+ */
+public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final int ENTRY_CNT = 80000;
+
+    /** */
+    private boolean useCache;
+
+    /** */
+    private String[] vals = new String[2048];
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setIncludeProperties();
+
+        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+        cfg.setConnectorConfiguration(null);
+
+        cfg.setPeerClassLoadingEnabled(true);
+
+        if (useCache) {
+            CacheConfiguration cc = defaultCacheConfiguration();
+
+            cc.setCacheMode(PARTITIONED);
+
+            cc.setDistributionMode(PARTITIONED_ONLY);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+            cc.setStartSize(ENTRY_CNT / GRID_CNT);
+            cc.setSwapEnabled(false);
+
+            cc.setBackups(1);
+
+            cc.setStoreValueBytes(true);
+
+            cfg.setCacheSanityCheckEnabled(false);
+            cfg.setCacheConfiguration(cc);
+        }
+        else
+            cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        for (int i = 0; i < vals.length; i++) {
+            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
+
+            StringBuilder sb = new StringBuilder();
+
+            for (int j = 0; j < valLen; j++)
+                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
+
+            vals[i] = sb.toString();
+
+            info("Value: " + vals[i]);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPerformance() throws Exception {
+        doTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        System.gc();
+        System.gc();
+        System.gc();
+
+        try {
+            useCache = true;
+
+            startGridsMultiThreaded(GRID_CNT);
+
+            useCache = false;
+
+            Ignite ignite = startGrid();
+
+            final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
+
+            ldr.perNodeBufferSize(8192);
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted());
+            ldr.autoFlushFrequency(0);
+
+            final LongAdder cnt = new LongAdder();
+
+            long start = U.currentTimeMillis();
+
+            Thread t = new Thread(new Runnable() {
+                @SuppressWarnings("BusyWait")
+                @Override public void run() {
+                    while (true) {
+                        try {
+                            Thread.sleep(10000);
+                        }
+                        catch (InterruptedException ignored) {
+                            break;
+                        }
+
+                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
+                    }
+                }
+            });
+
+            t.setDaemon(true);
+
+            t.start();
+
+            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
+
+            multithreaded(new Callable<Object>() {
+                @SuppressWarnings("InfiniteLoopStatement")
+                @Override public Object call() throws Exception {
+                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+
+                    while (true) {
+                        int i = rnd.nextInt(ENTRY_CNT);
+
+                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
+
+                        cnt.increment();
+                    }
+                }
+            }, threadNum, "loader");
+
+            info("Closing loader...");
+
+            ldr.close(false);
+
+            long duration = U.currentTimeMillis() - start;
+
+            info("Finished performance test. Duration: " + duration + "ms.");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
new file mode 100644
index 0000000..9402c0c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static ConcurrentHashMap<Object, Object> storeMap;
+
+    /** */
+    private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private CacheMode mode = PARTITIONED;
+
+    /** */
+    private boolean nearEnabled = true;
+
+    /** */
+    private boolean useCache;
+
+    /** */
+    private TestStore store;
+
+    /** {@inheritDoc} */
+    @Override public void afterTest() throws Exception {
+        super.afterTest();
+
+        useCache = false;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "unchecked"})
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setIncludeProperties();
+
+        cfg.setMarshaller(new OptimizedMarshaller(false));
+
+        if (useCache) {
+            CacheConfiguration cc = defaultCacheConfiguration();
+
+            cc.setCacheMode(mode);
+            cc.setAtomicityMode(TRANSACTIONAL);
+            cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+
+            cc.setEvictSynchronized(false);
+            cc.setEvictNearSynchronized(false);
+
+            if (store != null) {
+                cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+                cc.setReadThrough(true);
+                cc.setWriteThrough(true);
+            }
+
+            cfg.setCacheConfiguration(cc);
+        }
+        else
+            cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitioned() throws Exception {
+        mode = PARTITIONED;
+
+        checkDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testColocated() throws Exception {
+        mode = PARTITIONED;
+        nearEnabled = false;
+
+        checkDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicated() throws Exception {
+        mode = REPLICATED;
+
+        checkDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocal() throws Exception {
+        mode = LOCAL;
+
+        try {
+            checkDataStreamer();
+
+            assert false;
+        }
+        catch (IgniteCheckedException e) {
+            // Cannot load local cache configured remotely.
+            info("Caught expected exception: " + e);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ErrorNotRethrown")
+    private void checkDataStreamer() throws Exception {
+        try {
+            Ignite g1 = startGrid(1);
+
+            useCache = true;
+
+            Ignite g2 = startGrid(2);
+            startGrid(3);
+
+            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
+
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
+
+            final AtomicInteger idxGen = new AtomicInteger();
+            final int cnt = 400;
+            final int threads = 10;
+
+            final CountDownLatch l1 = new CountDownLatch(threads);
+
+            IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; i++) {
+                        int idx = idxGen.getAndIncrement();
+
+                        futs.add(ldr.addData(idx, idx));
+                    }
+
+                    l1.countDown();
+
+                    for (IgniteFuture<?> fut : futs)
+                        fut.get();
+
+                    return null;
+                }
+            }, threads);
+
+            l1.await();
+
+            // This will wait until data streamer finishes loading.
+            stopGrid(getTestGridName(1), false);
+
+            f1.get();
+
+            int s2 = internalCache(2).primaryKeySet().size();
+            int s3 = internalCache(3).primaryKeySet().size();
+            int total = threads * cnt;
+
+            assertEquals(total, s2 + s3);
+
+            final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
+
+            rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
+
+            final CountDownLatch l2 = new CountDownLatch(threads);
+
+            IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+                    for (int i = 0; i < cnt; i++) {
+                        final int key = idxGen.decrementAndGet();
+
+                        futs.add(rmvLdr.removeData(key));
+                    }
+
+                    l2.countDown();
+
+                    for (IgniteFuture<?> fut : futs)
+                        fut.get();
+
+                    return null;
+                }
+            }, threads);
+
+            l2.await();
+
+            rmvLdr.close(false);
+
+            f2.get();
+
+            s2 = internalCache(2).primaryKeySet().size();
+            s3 = internalCache(3).primaryKeySet().size();
+
+            assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedIsolated() throws Exception {
+        mode = PARTITIONED;
+
+        checkIsolatedDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedIsolated() throws Exception {
+        mode = REPLICATED;
+
+        checkIsolatedDataStreamer();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkIsolatedDataStreamer() throws Exception {
+        try {
+            useCache = true;
+
+            Ignite g1 = startGrid(0);
+            startGrid(1);
+            startGrid(2);
+
+            awaitPartitionMapExchange();
+
+            GridCache<Integer, Integer> cache = ((IgniteKernal)grid(0)).cache(null);
+
+            for (int i = 0; i < 100; i++)
+                cache.put(i, -1);
+
+            final int cnt = 40_000;
+            final int threads = 10;
+
+            try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) {
+                final AtomicInteger idxGen = new AtomicInteger();
+
+                IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        for (int i = 0; i < cnt; i++) {
+                            int idx = idxGen.getAndIncrement();
+
+                            ldr.addData(idx, idx);
+                        }
+
+                        return null;
+                    }
+                }, threads);
+
+                f1.get();
+            }
+
+            for (int g = 0; g < 3; g++) {
+                ClusterNode locNode = grid(g).localNode();
+
+                GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null);
+
+                if (cache0.isNear())
+                    cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht();
+
+                CacheAffinity<Integer> aff = cache0.affinity();
+
+                for (int key = 0; key < cnt * threads; key++) {
+                    if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) {
+                        GridCacheEntryEx<Integer, Integer> entry = cache0.peekEx(key);
+
+                        assertNotNull("Missing entry for key: " + key, entry);
+                        assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false));
+                    }
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Test primitive arrays can be passed into data streamer.
+     *
+     * @throws Exception If failed.
+     */
+    public void testPrimitiveArrays() throws Exception {
+        try {
+            useCache = true;
+            mode = PARTITIONED;
+
+            Ignite g1 = startGrid(1);
+            startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
+
+            List<Object> arrays = Arrays.<Object>asList(
+                new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
+                new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
+
+            IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
+
+            for (int i = 0, size = arrays.size(); i < 1000; i++) {
+                Object arr = arrays.get(i % size);
+
+                dataLdr.addData(i, arr);
+                dataLdr.addData(i, fixedClosure(arr));
+            }
+
+            dataLdr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedMultiThreaded() throws Exception {
+        mode = REPLICATED;
+
+        checkLoaderMultithreaded(1, 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPartitionedMultiThreaded() throws Exception {
+        mode = PARTITIONED;
+
+        checkLoaderMultithreaded(1, 3);
+    }
+
+    /**
+     * Tests loader in multithreaded environment with various count of grids started.
+     *
+     * @param nodesCntNoCache How many nodes should be started without cache.
+     * @param nodesCntCache How many nodes should be started with cache.
+     * @throws Exception If failed.
+     */
+    protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
+        throws Exception {
+        try {
+            // Start all required nodes.
+            int idx = 1;
+
+            for (int i = 0; i < nodesCntNoCache; i++)
+                startGrid(idx++);
+
+            useCache = true;
+
+            for (int i = 0; i < nodesCntCache; i++)
+                startGrid(idx++);
+
+            Ignite g1 = grid(1);
+
+            // Get and configure loader.
+            final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
+
+            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
+            ldr.perNodeBufferSize(2);
+
+            // Define count of puts.
+            final AtomicInteger idxGen = new AtomicInteger();
+
+            final AtomicBoolean done = new AtomicBoolean();
+
+            try {
+                final int totalPutCnt = 50000;
+
+                IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+                        while (!done.get()) {
+                            int idx = idxGen.getAndIncrement();
+
+                            if (idx >= totalPutCnt) {
+                                info(">>> Stopping producer thread since maximum count of puts reached.");
+
+                                break;
+                            }
+
+                            futs.add(ldr.addData(idx, idx));
+                        }
+
+                        ldr.flush();
+
+                        for (IgniteFuture<?> fut : futs)
+                            fut.get();
+
+                        return null;
+                    }
+                }, 5, "producer");
+
+                IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        while (!done.get()) {
+                            ldr.flush();
+
+                            U.sleep(100);
+                        }
+
+                        return null;
+                    }
+                }, 1, "flusher");
+
+                // Define index of node being restarted.
+                final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
+
+                IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            for (int i = 0; i < 5; i++) {
+                                Ignite g = startGrid(restartNodeIdx);
+
+                                UUID id = g.cluster().localNode().id();
+
+                                info(">>>>>>> Started node: " + id);
+
+                                U.sleep(1000);
+
+                                stopGrid(getTestGridName(restartNodeIdx), true);
+
+                                info(">>>>>>> Stopped node: " + id);
+                            }
+                        }
+                        finally {
+                            done.set(true);
+
+                            info("Start stop thread finished.");
+                        }
+
+                        return null;
+                    }
+                }, 1, "start-stop-thread");
+
+                fut1.get();
+                fut2.get();
+                fut3.get();
+            }
+            finally {
+                ldr.close(false);
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoaderApi() throws Exception {
+        useCache = true;
+
+        try {
+            Ignite g1 = startGrid(1);
+
+            IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
+
+            ldr.close(false);
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+
+            try {
+                // Create another loader.
+                ldr = g1.dataStreamer("UNKNOWN_CACHE");
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            ldr.close(true);
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+
+            // Create another loader.
+            ldr = g1.dataStreamer(null);
+
+            // Cancel with future.
+            ldr.future().cancel();
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            try {
+                ldr.future().get();
+
+                assert false;
+            }
+            catch (IgniteFutureCancelledException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            // Create another loader.
+            ldr = g1.dataStreamer(null);
+
+            // This will close loader.
+            stopGrid(getTestGridName(1), false);
+
+            try {
+                ldr.addData(0, 0);
+
+                assert false;
+            }
+            catch (IllegalStateException e) {
+                info("Caught expected exception: " + e);
+            }
+
+            assert ldr.future().isDone();
+
+            ldr.future().get();
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Wraps integer to closure returning it.
+     *
+     * @param i Value to wrap.
+     * @return Callable.
+     */
+    private static Callable<Integer> callable(@Nullable final Integer i) {
+        return new Callable<Integer>() {
+            @Override public Integer call() throws Exception {
+                return i;
+            }
+        };
+    }
+
+    /**
+     * Wraps integer to closure returning it.
+     *
+     * @param i Value to wrap.
+     * @return Closure.
+     */
+    private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
+        return new IgniteClosure<Integer, Integer>() {
+            @Override public Integer apply(Integer e) {
+                return e == null ? i : e + i;
+            }
+        };
+    }
+
+    /**
+     * Wraps object to closure returning it.
+     *
+     * @param obj Value to wrap.
+     * @return Closure.
+     */
+    private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
+        return new IgniteClosure<T, T>() {
+            @Override public T apply(T e) {
+                assert e == null || obj == null || e.getClass() == obj.getClass() :
+                    "Expects the same types [e=" + e + ", obj=" + obj + ']';
+
+                return obj;
+            }
+        };
+    }
+
+    /**
+     * Wraps integer to closure expecting it and returning {@code null}.
+     *
+     * @param exp Expected closure value.
+     * @return Remove expected cache value closure.
+     */
+    private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
+        return new IgniteClosure<T, T>() {
+            @Override public T apply(T act) {
+                if (exp == null ? act == null : exp.equals(act))
+                    return null;
+
+                throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
+            }
+        };
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFlush() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            final IgniteCache<Integer, Integer> c = g.jcache(null);
+
+            final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+            ldr.perNodeBufferSize(10);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.localSize() == 0);
+
+            multithreaded(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    ldr.flush();
+
+                    assertEquals(9, c.size());
+
+                    return null;
+                }
+            }, 5, "flush-checker");
+
+            ldr.addData(100, 100);
+
+            ldr.flush();
+
+            assertEquals(10, c.size());
+
+            ldr.addData(200, 200);
+
+            ldr.close(false);
+
+            ldr.future().get();
+
+            assertEquals(11, c.size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTryFlush() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            IgniteCache<Integer, Integer> c = g.jcache(null);
+
+            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+            ldr.perNodeBufferSize(10);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.localSize() == 0);
+
+            ldr.tryFlush();
+
+            Thread.sleep(100);
+
+            assertEquals(9, c.size());
+
+            ldr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFlushTimeout() throws Exception {
+        mode = LOCAL;
+
+        useCache = true;
+
+        try {
+            Ignite g = startGrid();
+
+            final CountDownLatch latch = new CountDownLatch(9);
+
+            g.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    latch.countDown();
+
+                    return true;
+                }
+            }, EVT_CACHE_OBJECT_PUT);
+
+            IgniteCache<Integer, Integer> c = g.jcache(null);
+
+            assertTrue(c.localSize() == 0);
+
+            IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+            ldr.perNodeBufferSize(10);
+            ldr.autoFlushFrequency(3000);
+            ldr.allowOverwrite(true);
+
+            for (int i = 0; i < 9; i++)
+                ldr.addData(i, i);
+
+            assertTrue(c.localSize() == 0);
+
+            assertFalse(latch.await(1000, MILLISECONDS));
+
+            assertTrue(c.localSize() == 0);
+
+            assertTrue(latch.await(3000, MILLISECONDS));
+
+            assertEquals(9, c.size());
+
+            ldr.close(false);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateStore() throws Exception {
+        storeMap = new ConcurrentHashMap<>();
+
+        try {
+            store = new TestStore();
+
+            useCache = true;
+
+            Ignite ignite = startGrid(1);
+
+            startGrid(2);
+            startGrid(3);
+
+            for (int i = 0; i < 1000; i++)
+                storeMap.put(i, i);
+
+            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
+                ldr.allowOverwrite(true);
+
+                assertFalse(ldr.skipStore());
+
+                for (int i = 0; i < 1000; i++)
+                    ldr.removeData(i);
+
+                for (int i = 1000; i < 2000; i++)
+                    ldr.addData(i, i);
+            }
+
+            for (int i = 0; i < 1000; i++)
+                assertNull(storeMap.get(i));
+
+            for (int i = 1000; i < 2000; i++)
+                assertEquals(i, storeMap.get(i));
+
+            try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
+                ldr.allowOverwrite(true);
+
+                ldr.skipStore(true);
+
+                for (int i = 0; i < 1000; i++)
+                    ldr.addData(i, i);
+
+                for (int i = 1000; i < 2000; i++)
+                    ldr.removeData(i);
+            }
+
+            IgniteCache<Object, Object> cache = ignite.jcache(null);
+
+            for (int i = 0; i < 1000; i++) {
+                assertNull(storeMap.get(i));
+
+                assertEquals(i, cache.get(i));
+            }
+
+            for (int i = 1000; i < 2000; i++) {
+                assertEquals(i, storeMap.get(i));
+
+                assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
+            }
+        }
+        finally {
+            storeMap = null;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestObject {
+        /** Value. */
+        private final int val;
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestObject obj = (TestObject)o;
+
+            return val == obj.val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Nullable @Override public Object load(Object key) {
+            return storeMap.get(key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) {
+            storeMap.put(entry.getKey(), entry.getValue());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            storeMap.remove(key);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 30285a8..aea55df 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.preloa
 import org.apache.ignite.internal.processors.cache.expiry.*;
 import org.apache.ignite.internal.processors.cache.integration.*;
 import org.apache.ignite.internal.processors.cache.local.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 
 /**
  * Test suite.


[5/5] incubator-ignite git commit: # gg-9869

Posted by sb...@apache.org.
# gg-9869


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9c8217c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9c8217c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9c8217c1

Branch: refs/heads/ignite-394
Commit: 9c8217c17851487f783bf4fc05d75b4d2996c251
Parents: 96f426b
Author: Artem Shutak <as...@gridgain.com>
Authored: Wed Mar 4 15:27:48 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Wed Mar 4 15:27:48 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContext.java      |    2 +-
 .../ignite/internal/GridKernalContextImpl.java  |    2 +-
 .../apache/ignite/internal/IgniteKernal.java    |    2 +-
 .../communication/GridIoMessageFactory.java     |    2 +-
 .../processors/cache/GridCacheAdapter.java      |    2 +-
 .../GridDistributedCacheAdapter.java            |    2 +-
 .../dataload/GridDataLoadRequest.java           |  450 ------
 .../dataload/GridDataLoadResponse.java          |  166 --
 .../IgniteDataStreamerCacheUpdaters.java        |  199 ---
 .../dataload/IgniteDataStreamerFuture.java      |   75 -
 .../dataload/IgniteDataStreamerImpl.java        | 1453 ------------------
 .../dataload/IgniteDataStreamerProcessor.java   |  316 ----
 .../dataload/IgniteDataStreamerUpdateJob.java   |  119 --
 .../internal/processors/dataload/package.html   |   24 -
 .../datastream/GridDataLoadRequest.java         |  450 ++++++
 .../datastream/GridDataLoadResponse.java        |  166 ++
 .../IgniteDataStreamerCacheUpdaters.java        |  199 +++
 .../datastream/IgniteDataStreamerFuture.java    |   75 +
 .../datastream/IgniteDataStreamerImpl.java      | 1453 ++++++++++++++++++
 .../datastream/IgniteDataStreamerProcessor.java |  316 ++++
 .../datastream/IgniteDataStreamerUpdateJob.java |  119 ++
 .../internal/processors/datastream/package.html |   24 +
 .../processors/igfs/IgfsDataManager.java        |    2 +-
 .../IgniteDataStreamerImplSelfTest.java         |  214 ---
 .../IgniteDataStreamerPerformanceTest.java      |  199 ---
 .../IgniteDataStreamerProcessorSelfTest.java    |  924 -----------
 .../IgniteDataStreamerImplSelfTest.java         |  214 +++
 .../IgniteDataStreamerPerformanceTest.java      |  199 +++
 .../IgniteDataStreamerProcessorSelfTest.java    |  924 +++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |    2 +-
 30 files changed, 4147 insertions(+), 4147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 616eac7..48752cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.processors.clock.*;
 import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.processors.cluster.*;
 import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a38ca92..edc5e00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.processors.clock.*;
 import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.processors.cluster.*;
 import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.igfs.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 60df162..a13da36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -41,7 +41,7 @@ import org.apache.ignite.internal.processors.clock.*;
 import org.apache.ignite.internal.processors.closure.*;
 import org.apache.ignite.internal.processors.cluster.*;
 import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 import org.apache.ignite.internal.processors.datastructures.*;
 import org.apache.ignite.internal.processors.hadoop.*;
 import org.apache.ignite.internal.processors.job.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6109d74..2079233 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.clock.*;
 import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 import org.apache.ignite.internal.processors.igfs.*;
 import org.apache.ignite.internal.processors.rest.handlers.task.*;
 import org.apache.ignite.internal.processors.streamer.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 83118c4..dd9efd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.dr.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 import org.apache.ignite.internal.processors.dr.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.transactions.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 16419f9..f745b5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
deleted file mode 100644
index b3828ed..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridDataLoadRequest implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long reqId;
-
-    /** */
-    private byte[] resTopicBytes;
-
-    /** Cache name. */
-    private String cacheName;
-
-    /** */
-    private byte[] updaterBytes;
-
-    /** Entries to put. */
-    private byte[] colBytes;
-
-    /** {@code True} to ignore deployment ownership. */
-    private boolean ignoreDepOwnership;
-
-    /** */
-    private boolean skipStore;
-
-    /** */
-    private DeploymentMode depMode;
-
-    /** */
-    private String sampleClsName;
-
-    /** */
-    private String userVer;
-
-    /** Node class loader participants. */
-    @GridToStringInclude
-    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
-    private Map<UUID, IgniteUuid> ldrParticipants;
-
-    /** */
-    private IgniteUuid clsLdrId;
-
-    /** */
-    private boolean forceLocDep;
-
-    /**
-     * {@code Externalizable} support.
-     */
-    public GridDataLoadRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param reqId Request ID.
-     * @param resTopicBytes Response topic.
-     * @param cacheName Cache name.
-     * @param updaterBytes Cache updater.
-     * @param colBytes Collection bytes.
-     * @param ignoreDepOwnership Ignore ownership.
-     * @param skipStore Skip store flag.
-     * @param depMode Deployment mode.
-     * @param sampleClsName Sample class name.
-     * @param userVer User version.
-     * @param ldrParticipants Loader participants.
-     * @param clsLdrId Class loader ID.
-     * @param forceLocDep Force local deployment.
-     */
-    public GridDataLoadRequest(long reqId,
-        byte[] resTopicBytes,
-        @Nullable String cacheName,
-        byte[] updaterBytes,
-        byte[] colBytes,
-        boolean ignoreDepOwnership,
-        boolean skipStore,
-        DeploymentMode depMode,
-        String sampleClsName,
-        String userVer,
-        Map<UUID, IgniteUuid> ldrParticipants,
-        IgniteUuid clsLdrId,
-        boolean forceLocDep) {
-        this.reqId = reqId;
-        this.resTopicBytes = resTopicBytes;
-        this.cacheName = cacheName;
-        this.updaterBytes = updaterBytes;
-        this.colBytes = colBytes;
-        this.ignoreDepOwnership = ignoreDepOwnership;
-        this.skipStore = skipStore;
-        this.depMode = depMode;
-        this.sampleClsName = sampleClsName;
-        this.userVer = userVer;
-        this.ldrParticipants = ldrParticipants;
-        this.clsLdrId = clsLdrId;
-        this.forceLocDep = forceLocDep;
-    }
-
-    /**
-     * @return Request ID.
-     */
-    public long requestId() {
-        return reqId;
-    }
-
-    /**
-     * @return Response topic.
-     */
-    public byte[] responseTopicBytes() {
-        return resTopicBytes;
-    }
-
-    /**
-     * @return Cache name.
-     */
-    public String cacheName() {
-        return cacheName;
-    }
-
-    /**
-     * @return Updater.
-     */
-    public byte[] updaterBytes() {
-        return updaterBytes;
-    }
-
-    /**
-     * @return Collection bytes.
-     */
-    public byte[] collectionBytes() {
-        return colBytes;
-    }
-
-    /**
-     * @return {@code True} to ignore ownership.
-     */
-    public boolean ignoreDeploymentOwnership() {
-        return ignoreDepOwnership;
-    }
-
-    /**
-     * @return Skip store flag.
-     */
-    public boolean skipStore() {
-        return skipStore;
-    }
-
-    /**
-     * @return Deployment mode.
-     */
-    public DeploymentMode deploymentMode() {
-        return depMode;
-    }
-
-    /**
-     * @return Sample class name.
-     */
-    public String sampleClassName() {
-        return sampleClsName;
-    }
-
-    /**
-     * @return User version.
-     */
-    public String userVersion() {
-        return userVer;
-    }
-
-    /**
-     * @return Participants.
-     */
-    public Map<UUID, IgniteUuid> participants() {
-        return ldrParticipants;
-    }
-
-    /**
-     * @return Class loader ID.
-     */
-    public IgniteUuid classLoaderId() {
-        return clsLdrId;
-    }
-
-    /**
-     * @return {@code True} to force local deployment.
-     */
-    public boolean forceLocalDeployment() {
-        return forceLocDep;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDataLoadRequest.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeString("cacheName", cacheName))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeByteArray("colBytes", colBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 3:
-                if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
-                    return false;
-
-                writer.incrementState();
-
-            case 4:
-                if (!writer.writeBoolean("forceLocDep", forceLocDep))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
-                if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
-                    return false;
-
-                writer.incrementState();
-
-            case 6:
-                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
-                    return false;
-
-                writer.incrementState();
-
-            case 7:
-                if (!writer.writeLong("reqId", reqId))
-                    return false;
-
-                writer.incrementState();
-
-            case 8:
-                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 9:
-                if (!writer.writeString("sampleClsName", sampleClsName))
-                    return false;
-
-                writer.incrementState();
-
-            case 10:
-                if (!writer.writeBoolean("skipStore", skipStore))
-                    return false;
-
-                writer.incrementState();
-
-            case 11:
-                if (!writer.writeByteArray("updaterBytes", updaterBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 12:
-                if (!writer.writeString("userVer", userVer))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                cacheName = reader.readString("cacheName");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                clsLdrId = reader.readIgniteUuid("clsLdrId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                colBytes = reader.readByteArray("colBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 3:
-                byte depModeOrd;
-
-                depModeOrd = reader.readByte("depMode");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
-                reader.incrementState();
-
-            case 4:
-                forceLocDep = reader.readBoolean("forceLocDep");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
-                ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 6:
-                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 7:
-                reqId = reader.readLong("reqId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 8:
-                resTopicBytes = reader.readByteArray("resTopicBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 9:
-                sampleClsName = reader.readString("sampleClsName");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 10:
-                skipStore = reader.readBoolean("skipStore");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 11:
-                updaterBytes = reader.readByteArray("updaterBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 12:
-                userVer = reader.readString("userVer");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 62;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 13;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
deleted file mode 100644
index 835e3bd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.nio.*;
-
-/**
- *
- */
-public class GridDataLoadResponse implements Message {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long reqId;
-
-    /** */
-    private byte[] errBytes;
-
-    /** */
-    private boolean forceLocDep;
-
-    /**
-     * @param reqId Request ID.
-     * @param errBytes Error bytes.
-     * @param forceLocDep Force local deployment.
-     */
-    public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
-        this.reqId = reqId;
-        this.errBytes = errBytes;
-        this.forceLocDep = forceLocDep;
-    }
-
-    /**
-     * {@code Externalizable} support.
-     */
-    public GridDataLoadResponse() {
-        // No-op.
-    }
-
-    /**
-     * @return Request ID.
-     */
-    public long requestId() {
-        return reqId;
-    }
-
-    /**
-     * @return Error bytes.
-     */
-    public byte[] errorBytes() {
-        return errBytes;
-    }
-
-    /**
-     * @return {@code True} to force local deployment.
-     */
-    public boolean forceLocalDeployment() {
-        return forceLocDep;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDataLoadResponse.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType(), fieldsCount()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray("errBytes", errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeBoolean("forceLocDep", forceLocDep))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLong("reqId", reqId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!reader.beforeMessageRead())
-            return false;
-
-        switch (reader.state()) {
-            case 0:
-                errBytes = reader.readByteArray("errBytes");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                forceLocDep = reader.readBoolean("forceLocDep");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                reqId = reader.readLong("reqId");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 63;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte fieldsCount() {
-        return 3;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
deleted file mode 100644
index 1742041..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Bundled factory for cache updaters.
- */
-public class IgniteDataStreamerCacheUpdaters {
-    /** */
-    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
-
-    /** */
-    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
-
-    /**
-     * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
-     * is not the best.
-     *
-     * @return Single updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
-        return INDIVIDUAL;
-    }
-
-    /**
-     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
-     * updated concurrently. Performance is generally better than in {@link #individual()}.
-     *
-     * @return Batched updater.
-     */
-    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
-        return BATCHED;
-    }
-
-    /**
-     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
-     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
-     * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
-     *
-     * @return Batched sorted updater.
-     */
-    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
-        return BATCHED_SORTED;
-    }
-
-    /**
-     * Updates cache.
-     *
-     * @param cache Cache.
-     * @param rmvCol Keys to remove.
-     * @param putMap Entries to put.
-     * @throws IgniteException If failed.
-     */
-    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
-        Map<K, V> putMap) {
-        assert rmvCol != null || putMap != null;
-
-        // Here we assume that there are no key duplicates, so the following calls are valid.
-        if (rmvCol != null)
-            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
-
-        if (putMap != null)
-            cache.putAll(putMap);
-    }
-
-    /**
-     * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
-     */
-    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null)
-                    cache.remove(key);
-                else
-                    cache.put(key, val);
-            }
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock prone.
-     */
-    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new HashSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new HashMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-
-    /**
-     * Batched updater. Updates cache using batch operations thus is dead lock prone.
-     */
-    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            assert cache != null;
-            assert !F.isEmpty(entries);
-
-            Map<K, V> putAll = null;
-            Set<K> rmvAll = null;
-
-            for (Map.Entry<K, V> entry : entries) {
-                K key = entry.getKey();
-
-                assert key instanceof Comparable;
-
-                V val = entry.getValue();
-
-                if (val == null) {
-                    if (rmvAll == null)
-                        rmvAll = new TreeSet<>();
-
-                    rmvAll.add(key);
-                }
-                else {
-                    if (putAll == null)
-                        putAll = new TreeMap<>();
-
-                    putAll.put(key, val);
-                }
-            }
-
-            updateAll(cache, rmvAll, putAll);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
deleted file mode 100644
index 5730655..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Data streamer future.
- */
-class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Data streamer. */
-    @GridToStringExclude
-    private IgniteDataStreamerImpl dataLdr;
-
-    /**
-     * Default constructor for {@link Externalizable} support.
-     */
-    public IgniteDataStreamerFuture() {
-        // No-op.
-    }
-
-    /**
-     * @param ctx Context.
-     * @param dataLdr Data streamer.
-     */
-    IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
-        super(ctx);
-
-        assert dataLdr != null;
-
-        this.dataLdr = dataLdr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean cancel() throws IgniteCheckedException {
-        checkValid();
-
-        if (onCancelled()) {
-            dataLdr.closeEx(true);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
deleted file mode 100644
index 1231e27..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
+++ /dev/null
@@ -1,1453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dr.*;
-import org.apache.ignite.internal.processors.portable.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.Map.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- * Data streamer implementation.
- */
-@SuppressWarnings("unchecked")
-public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
-    /** Isolated updater. */
-    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
-
-    /** Cache updater. */
-    private Updater<K, V> updater = ISOLATED_UPDATER;
-
-    /** */
-    private byte[] updaterBytes;
-
-    /** Max remap count before issuing an error. */
-    private static final int DFLT_MAX_REMAP_CNT = 32;
-
-    /** Log reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Cache name ({@code null} for default cache). */
-    private final String cacheName;
-
-    /** Portable enabled flag. */
-    private final boolean portableEnabled;
-
-    /**
-     *  If {@code true} then data will be transferred in compact format (only keys and values).
-     *  Otherwise full map entry will be transferred (this is requires by DR internal logic).
-     */
-    private final boolean compact;
-
-    /** Per-node buffer size. */
-    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
-
-    /** */
-    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
-
-    /** */
-    private long autoFlushFreq;
-
-    /** Mapping. */
-    @GridToStringInclude
-    private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Discovery listener. */
-    private final GridLocalEventListener discoLsnr;
-
-    /** Context. */
-    private final GridKernalContext ctx;
-
-    /** Communication topic for responses. */
-    private final Object topic;
-
-    /** */
-    private byte[] topicBytes;
-
-    /** {@code True} if data streamer has been cancelled. */
-    private volatile boolean cancelled;
-
-    /** Active futures of this data streamer. */
-    @GridToStringInclude
-    private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
-
-    /** Closure to remove from active futures. */
-    @GridToStringExclude
-    private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
-        @Override public void apply(IgniteInternalFuture<?> t) {
-            boolean rmv = activeFuts.remove(t);
-
-            assert rmv;
-        }
-    };
-
-    /** Job peer deploy aware. */
-    private volatile GridPeerDeployAware jobPda;
-
-    /** Deployment class. */
-    private Class<?> depCls;
-
-    /** Future to track loading finish. */
-    private final GridFutureAdapter<?> fut;
-
-    /** Public API future to track loading finish. */
-    private final IgniteFuture<?> publicFut;
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** Closed flag. */
-    private final AtomicBoolean closed = new AtomicBoolean();
-
-    /** */
-    private volatile long lastFlushTime = U.currentTimeMillis();
-
-    /** */
-    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
-
-    /** */
-    private boolean skipStore;
-
-    /** */
-    private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
-
-    /**
-     * @param ctx Grid kernal context.
-     * @param cacheName Cache name.
-     * @param flushQ Flush queue.
-     * @param compact If {@code true} data is transferred in compact mode (only keys and values).
-     *                Otherwise full map entry will be transferred (this is required by DR internal logic).
-     */
-    public IgniteDataStreamerImpl(
-        final GridKernalContext ctx,
-        @Nullable final String cacheName,
-        DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
-        boolean compact
-    ) {
-        assert ctx != null;
-
-        this.ctx = ctx;
-        this.cacheName = cacheName;
-        this.flushQ = flushQ;
-        this.compact = compact;
-
-        log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
-
-        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
-        if (node == null)
-            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
-
-        portableEnabled = ctx.portable().portableEnabled(node, cacheName);
-
-        discoLsnr = new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
-                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
-                UUID id = discoEvt.eventNode().id();
-
-                // Remap regular mappings.
-                final Buffer buf = bufMappings.remove(id);
-
-                if (buf != null) {
-                    // Only async notification is possible since
-                    // discovery thread may be trapped otherwise.
-                    ctx.closure().callLocalSafe(
-                        new Callable<Object>() {
-                            @Override public Object call() throws Exception {
-                                buf.onNodeLeft();
-
-                                return null;
-                            }
-                        },
-                        true /* system pool */
-                    );
-                }
-            }
-        };
-
-        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
-
-        // Generate unique topic for this loader.
-        topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
-
-        ctx.io().addMessageListener(topic, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof GridDataLoadResponse;
-
-                GridDataLoadResponse res = (GridDataLoadResponse)msg;
-
-                if (log.isDebugEnabled())
-                    log.debug("Received data load response: " + res);
-
-                Buffer buf = bufMappings.get(nodeId);
-
-                if (buf != null)
-                    buf.onResponse(res);
-
-                else if (log.isDebugEnabled())
-                    log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
-            }
-        });
-
-        if (log.isDebugEnabled())
-            log.debug("Added response listener within topic: " + topic);
-
-        fut = new IgniteDataStreamerFuture(ctx, this);
-
-        publicFut = new IgniteFutureImpl<>(fut);
-    }
-
-    /**
-     * Enters busy lock.
-     */
-    private void enterBusy() {
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Data streamer has been closed.");
-    }
-
-    /**
-     * Leaves busy lock.
-     */
-    private void leaveBusy() {
-        busyLock.leaveBusy();
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> future() {
-        return publicFut;
-    }
-
-    /**
-     * @return Internal future.
-     */
-    public IgniteInternalFuture<?> internalFuture() {
-        return fut;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void deployClass(Class<?> depCls) {
-        this.depCls = depCls;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void updater(Updater<K, V> updater) {
-        A.notNull(updater, "updater");
-
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean allowOverwrite() {
-        return updater != ISOLATED_UPDATER;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void allowOverwrite(boolean allow) {
-        if (allow == allowOverwrite())
-            return;
-
-        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
-        if (node == null)
-            throw new IgniteException("Failed to get node for cache: " + cacheName);
-
-        updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean skipStore() {
-        return skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void skipStore(boolean skipStore) {
-        this.skipStore = skipStore;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public String cacheName() {
-        return cacheName;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int perNodeBufferSize() {
-        return bufSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void perNodeBufferSize(int bufSize) {
-        A.ensure(bufSize > 0, "bufSize > 0");
-
-        this.bufSize = bufSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int perNodeParallelLoadOperations() {
-        return parallelOps;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void perNodeParallelLoadOperations(int parallelOps) {
-        this.parallelOps = parallelOps;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long autoFlushFrequency() {
-        return autoFlushFreq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void autoFlushFrequency(long autoFlushFreq) {
-        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
-
-        long old = this.autoFlushFreq;
-
-        if (autoFlushFreq != old) {
-            this.autoFlushFreq = autoFlushFreq;
-
-            if (autoFlushFreq != 0 && old == 0)
-                flushQ.add(this);
-            else if (autoFlushFreq == 0)
-                flushQ.remove(this);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
-        A.notNull(entries, "entries");
-
-        return addData(entries.entrySet());
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
-        A.notEmpty(entries, "entries");
-
-        enterBusy();
-
-        try {
-            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
-
-            resFut.listenAsync(rmvActiveFut);
-
-            activeFuts.add(resFut);
-
-            Collection<K> keys = null;
-
-            if (entries.size() > 1) {
-                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
-
-                for (Map.Entry<K, V> entry : entries)
-                    keys.add(entry.getKey());
-            }
-
-            load0(entries, resFut, keys, 0);
-
-            return new IgniteFutureImpl<>(resFut);
-        }
-        catch (IgniteException e) {
-            return new IgniteFinishedFutureImpl<>(ctx, e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
-        A.notNull(entry, "entry");
-
-        return addData(F.asList(entry));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> addData(K key, V val) {
-        A.notNull(key, "key");
-
-        return addData(new Entry0<>(key, val));
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteFuture<?> removeData(K key) {
-        return addData(key, null);
-    }
-
-    /**
-     * @param entries Entries.
-     * @param resFut Result future.
-     * @param activeKeys Active keys.
-     * @param remaps Remaps count.
-     */
-    private void load0(
-        Collection<? extends Map.Entry<K, V>> entries,
-        final GridFutureAdapter<Object> resFut,
-        @Nullable final Collection<K> activeKeys,
-        final int remaps
-    ) {
-        assert entries != null;
-
-        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
-
-        boolean initPda = ctx.deploy().enabled() && jobPda == null;
-
-        for (Map.Entry<K, V> entry : entries) {
-            List<ClusterNode> nodes;
-
-            try {
-                K key = entry.getKey();
-
-                assert key != null;
-
-                if (initPda) {
-                    jobPda = new DataStreamerPda(key, entry.getValue(), updater);
-
-                    initPda = false;
-                }
-
-                nodes = nodes(key);
-            }
-            catch (IgniteCheckedException e) {
-                resFut.onDone(e);
-
-                return;
-            }
-
-            if (F.isEmpty(nodes)) {
-                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
-                    "(no nodes with cache found in topology) [infos=" + entries.size() +
-                    ", cacheName=" + cacheName + ']'));
-
-                return;
-            }
-
-            for (ClusterNode node : nodes) {
-                Collection<Map.Entry<K, V>> col = mappings.get(node);
-
-                if (col == null)
-                    mappings.put(node, col = new ArrayList<>());
-
-                col.add(entry);
-            }
-        }
-
-        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
-            final UUID nodeId = e.getKey().id();
-
-            Buffer buf = bufMappings.get(nodeId);
-
-            if (buf == null) {
-                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
-
-                if (old != null)
-                    buf = old;
-            }
-
-            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
-
-            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> t) {
-                    try {
-                        t.get();
-
-                        if (activeKeys != null) {
-                            for (Map.Entry<K, V> e : entriesForNode)
-                                activeKeys.remove(e.getKey());
-
-                            if (activeKeys.isEmpty())
-                                resFut.onDone();
-                        }
-                        else {
-                            assert entriesForNode.size() == 1;
-
-                            // That has been a single key,
-                            // so complete result future right away.
-                            resFut.onDone();
-                        }
-                    }
-                    catch (IgniteCheckedException e1) {
-                        if (log.isDebugEnabled())
-                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
-
-                        if (cancelled) {
-                            resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
-                                IgniteDataStreamerImpl.this, e1));
-                        }
-                        else if (remaps + 1 > maxRemapCnt) {
-                            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
-                                + remaps), e1);
-                        }
-                        else
-                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
-                    }
-                }
-            };
-
-            GridFutureAdapter<?> f;
-
-            try {
-                f = buf.update(entriesForNode, lsnr);
-            }
-            catch (IgniteInterruptedCheckedException e1) {
-                resFut.onDone(e1);
-
-                return;
-            }
-
-            if (ctx.discovery().node(nodeId) == null) {
-                if (bufMappings.remove(nodeId, buf))
-                    buf.onNodeLeft();
-
-                if (f != null)
-                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
-                        "(node has left): " + nodeId));
-            }
-        }
-    }
-
-    /**
-     * @param key Key to map.
-     * @return Nodes to send requests to.
-     * @throws IgniteCheckedException If failed.
-     */
-    private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
-        GridAffinityProcessor aff = ctx.affinity();
-
-        return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
-            Collections.singletonList(aff.mapKeyToNode(cacheName, key));
-    }
-
-    /**
-     * Performs flush.
-     *
-     * @throws IgniteCheckedException If failed.
-     */
-    private void doFlush() throws IgniteCheckedException {
-        lastFlushTime = U.currentTimeMillis();
-
-        List<IgniteInternalFuture> activeFuts0 = null;
-
-        int doneCnt = 0;
-
-        for (IgniteInternalFuture<?> f : activeFuts) {
-            if (!f.isDone()) {
-                if (activeFuts0 == null)
-                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
-
-                activeFuts0.add(f);
-            }
-            else {
-                f.get();
-
-                doneCnt++;
-            }
-        }
-
-        if (activeFuts0 == null || activeFuts0.isEmpty())
-            return;
-
-        while (true) {
-            Queue<IgniteInternalFuture<?>> q = null;
-
-            for (Buffer buf : bufMappings.values()) {
-                IgniteInternalFuture<?> flushFut = buf.flush();
-
-                if (flushFut != null) {
-                    if (q == null)
-                        q = new ArrayDeque<>(bufMappings.size() * 2);
-
-                    q.add(flushFut);
-                }
-            }
-
-            if (q != null) {
-                assert !q.isEmpty();
-
-                boolean err = false;
-
-                for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
-                    try {
-                        fut.get();
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (log.isDebugEnabled())
-                            log.debug("Failed to flush buffer: " + e);
-
-                        err = true;
-                    }
-                }
-
-                if (err)
-                    // Remaps needed - flush buffers.
-                    continue;
-            }
-
-            doneCnt = 0;
-
-            for (int i = 0; i < activeFuts0.size(); i++) {
-                IgniteInternalFuture f = activeFuts0.get(i);
-
-                if (f == null)
-                    doneCnt++;
-                else if (f.isDone()) {
-                    f.get();
-
-                    doneCnt++;
-
-                    activeFuts0.set(i, null);
-                }
-                else
-                    break;
-            }
-
-            if (doneCnt == activeFuts0.size())
-                return;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public void flush() throws IgniteException {
-        enterBusy();
-
-        try {
-            doFlush();
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * Flushes every internal buffer if buffer was flushed before passed in
-     * threshold.
-     * <p>
-     * Does not wait for result and does not fail on errors assuming that this method
-     * should be called periodically.
-     */
-    @Override public void tryFlush() throws IgniteInterruptedException {
-        if (!busyLock.enterBusy())
-            return;
-
-        try {
-            for (Buffer buf : bufMappings.values())
-                buf.flush();
-
-            lastFlushTime = U.currentTimeMillis();
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            leaveBusy();
-        }
-    }
-
-    /**
-     * @param cancel {@code True} to close with cancellation.
-     * @throws IgniteException If failed.
-     */
-    @Override public void close(boolean cancel) throws IgniteException {
-        try {
-            closeEx(cancel);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-    }
-
-    /**
-     * @param cancel {@code True} to close with cancellation.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void closeEx(boolean cancel) throws IgniteCheckedException {
-        if (!closed.compareAndSet(false, true))
-            return;
-
-        busyLock.block();
-
-        if (log.isDebugEnabled())
-            log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
-
-        IgniteCheckedException e = null;
-
-        try {
-            // Assuming that no methods are called on this loader after this method is called.
-            if (cancel) {
-                cancelled = true;
-
-                for (Buffer buf : bufMappings.values())
-                    buf.cancelAll();
-            }
-            else
-                doFlush();
-
-            ctx.event().removeLocalEventListener(discoLsnr);
-
-            ctx.io().removeMessageListener(topic);
-        }
-        catch (IgniteCheckedException e0) {
-            e = e0;
-        }
-
-        fut.onDone(null, e);
-
-        if (e != null)
-            throw e;
-    }
-
-    /**
-     * @return {@code true} If the loader is closed.
-     */
-    boolean isClosed() {
-        return fut.isDone();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() throws IgniteException {
-        close(false);
-    }
-
-    /**
-     * @return Max remap count.
-     */
-    public int maxRemapCount() {
-        return maxRemapCnt;
-    }
-
-    /**
-     * @param maxRemapCnt New max remap count.
-     */
-    public void maxRemapCount(int maxRemapCnt) {
-        this.maxRemapCnt = maxRemapCnt;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgniteDataStreamerImpl.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getDelay(TimeUnit unit) {
-        return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * @return Next flush time.
-     */
-    private long nextFlushTime() {
-        return lastFlushTime + autoFlushFreq;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(Delayed o) {
-        return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
-    }
-
-    /**
-     *
-     */
-    private class Buffer {
-        /** Node. */
-        private final ClusterNode node;
-
-        /** Active futures. */
-        private final Collection<IgniteInternalFuture<Object>> locFuts;
-
-        /** Buffered entries. */
-        private List<Map.Entry<K, V>> entries;
-
-        /** */
-        @GridToStringExclude
-        private GridFutureAdapter<Object> curFut;
-
-        /** Local node flag. */
-        private final boolean isLocNode;
-
-        /** ID generator. */
-        private final AtomicLong idGen = new AtomicLong();
-
-        /** Active futures. */
-        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
-
-        /** */
-        private final Semaphore sem;
-
-        /** Closure to signal on task finish. */
-        @GridToStringExclude
-        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
-            @Override public void apply(IgniteInternalFuture<Object> t) {
-                signalTaskFinished(t);
-            }
-        };
-
-        /**
-         * @param node Node.
-         */
-        Buffer(ClusterNode node) {
-            assert node != null;
-
-            this.node = node;
-
-            locFuts = new GridConcurrentHashSet<>();
-            reqs = new ConcurrentHashMap8<>();
-
-            // Cache local node flag.
-            isLocNode = node.equals(ctx.discovery().localNode());
-
-            entries = newEntries();
-            curFut = new GridFutureAdapter<>(ctx);
-            curFut.listenAsync(signalC);
-
-            sem = new Semaphore(parallelOps);
-        }
-
-        /**
-         * @param newEntries Infos.
-         * @param lsnr Listener for the operation future.
-         * @throws IgniteInterruptedCheckedException If failed.
-         * @return Future for operation.
-         */
-        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
-            IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
-            List<Map.Entry<K, V>> entries0 = null;
-            GridFutureAdapter<Object> curFut0;
-
-            synchronized (this) {
-                curFut0 = curFut;
-
-                curFut0.listenAsync(lsnr);
-
-                for (Map.Entry<K, V> entry : newEntries)
-                    entries.add(entry);
-
-                if (entries.size() >= bufSize) {
-                    entries0 = entries;
-
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>(ctx);
-                    curFut.listenAsync(signalC);
-                }
-            }
-
-            if (entries0 != null) {
-                submit(entries0, curFut0);
-
-                if (cancelled)
-                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this));
-            }
-
-            return curFut0;
-        }
-
-        /**
-         * @return Fresh collection with some space for outgrowth.
-         */
-        private List<Map.Entry<K, V>> newEntries() {
-            return new ArrayList<>((int)(bufSize * 1.2));
-        }
-
-        /**
-         * @return Future if any submitted.
-         *
-         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
-         */
-        @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
-            List<Map.Entry<K, V>> entries0 = null;
-            GridFutureAdapter<Object> curFut0 = null;
-
-            synchronized (this) {
-                if (!entries.isEmpty()) {
-                    entries0 = entries;
-                    curFut0 = curFut;
-
-                    entries = newEntries();
-                    curFut = new GridFutureAdapter<>(ctx);
-                    curFut.listenAsync(signalC);
-                }
-            }
-
-            if (entries0 != null)
-                submit(entries0, curFut0);
-
-            // Create compound future for this flush.
-            GridCompoundFuture<Object, Object> res = null;
-
-            for (IgniteInternalFuture<Object> f : locFuts) {
-                if (res == null)
-                    res = new GridCompoundFuture<>(ctx);
-
-                res.add(f);
-            }
-
-            for (IgniteInternalFuture<Object> f : reqs.values()) {
-                if (res == null)
-                    res = new GridCompoundFuture<>(ctx);
-
-                res.add(f);
-            }
-
-            if (res != null)
-                res.markInitialized();
-
-            return res;
-        }
-
-        /**
-         * Increments active tasks count.
-         *
-         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
-         */
-        private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
-            U.acquire(sem);
-        }
-
-        /**
-         * @param f Future that finished.
-         */
-        private void signalTaskFinished(IgniteInternalFuture<Object> f) {
-            assert f != null;
-
-            sem.release();
-        }
-
-        /**
-         * @param entries Entries to submit.
-         * @param curFut Current future.
-         * @throws IgniteInterruptedCheckedException If interrupted.
-         */
-        private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
-            throws IgniteInterruptedCheckedException {
-            assert entries != null;
-            assert !entries.isEmpty();
-            assert curFut != null;
-
-            incrementActiveTasks();
-
-            IgniteInternalFuture<Object> fut;
-
-            if (isLocNode) {
-                fut = ctx.closure().callLocalSafe(
-                    new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
-
-                locFuts.add(fut);
-
-                fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
-                    @Override public void apply(IgniteInternalFuture<Object> t) {
-                        try {
-                            boolean rmv = locFuts.remove(t);
-
-                            assert rmv;
-
-                            curFut.onDone(t.get());
-                        }
-                        catch (IgniteCheckedException e) {
-                            curFut.onDone(e);
-                        }
-                    }
-                });
-            }
-            else {
-                byte[] entriesBytes;
-
-                try {
-                    if (compact) {
-                        entriesBytes = ctx.config().getMarshaller()
-                            .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
-                    }
-                    else
-                        entriesBytes = ctx.config().getMarshaller().marshal(entries);
-
-                    if (updaterBytes == null) {
-                        assert updater != null;
-
-                        updaterBytes = ctx.config().getMarshaller().marshal(updater);
-                    }
-
-                    if (topicBytes == null)
-                        topicBytes = ctx.config().getMarshaller().marshal(topic);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to marshal (request will not be sent).", e);
-
-                    return;
-                }
-
-                GridDeployment dep = null;
-                GridPeerDeployAware jobPda0 = null;
-
-                if (ctx.deploy().enabled()) {
-                    try {
-                        jobPda0 = jobPda;
-
-                        assert jobPda0 != null;
-
-                        dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
-
-                        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-
-                        if (cache != null)
-                            cache.context().deploy().onEnter();
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
-
-                        return;
-                    }
-
-                    if (dep == null)
-                        U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
-                }
-
-                long reqId = idGen.incrementAndGet();
-
-                fut = curFut;
-
-                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
-
-                GridDataLoadRequest req = new GridDataLoadRequest(
-                    reqId,
-                    topicBytes,
-                    cacheName,
-                    updaterBytes,
-                    entriesBytes,
-                    true,
-                    skipStore,
-                    dep != null ? dep.deployMode() : null,
-                    dep != null ? jobPda0.deployClass().getName() : null,
-                    dep != null ? dep.userVersion() : null,
-                    dep != null ? dep.participants() : null,
-                    dep != null ? dep.classLoaderId() : null,
-                    dep == null);
-
-                try {
-                    ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
-                }
-                catch (IgniteCheckedException e) {
-                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
-                        ((GridFutureAdapter<Object>)fut).onDone(e);
-                    else
-                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
-                            "request (node has left): " + node.id()));
-                }
-            }
-        }
-
-        /**
-         *
-         */
-        void onNodeLeft() {
-            assert !isLocNode;
-            assert bufMappings.get(node.id()) != this;
-
-            if (log.isDebugEnabled())
-                log.debug("Forcibly completing futures (node has left): " + node.id());
-
-            Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
-                "(node has left): " + node.id());
-
-            for (GridFutureAdapter<Object> f : reqs.values())
-                f.onDone(e);
-
-            // Make sure to complete current future.
-            GridFutureAdapter<Object> curFut0;
-
-            synchronized (this) {
-                curFut0 = curFut;
-            }
-
-            curFut0.onDone(e);
-        }
-
-        /**
-         * @param res Response.
-         */
-        void onResponse(GridDataLoadResponse res) {
-            if (log.isDebugEnabled())
-                log.debug("Received data load response: " + res);
-
-            GridFutureAdapter<?> f = reqs.remove(res.requestId());
-
-            if (f == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Future for request has not been found: " + res.requestId());
-
-                return;
-            }
-
-            Throwable err = null;
-
-            byte[] errBytes = res.errorBytes();
-
-            if (errBytes != null) {
-                try {
-                    GridPeerDeployAware jobPda0 = jobPda;
-
-                    err = ctx.config().getMarshaller().unmarshal(
-                        errBytes,
-                        jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
-                }
-                catch (IgniteCheckedException e) {
-                    f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
-
-                    return;
-                }
-            }
-
-            f.onDone(null, err);
-
-            if (log.isDebugEnabled())
-                log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
-        }
-
-        /**
-         *
-         */
-        void cancelAll() {
-            IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this);
-
-            for (IgniteInternalFuture<?> f : locFuts) {
-                try {
-                    f.cancel();
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to cancel mini-future.", e);
-                }
-            }
-
-            for (GridFutureAdapter<?> f : reqs.values())
-                f.onDone(err);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            int size;
-
-            synchronized (this) {
-                size = entries.size();
-            }
-
-            return S.toString(Buffer.class, this,
-                "entriesCnt", size,
-                "locFutsSize", locFuts.size(),
-                "reqsSize", reqs.size());
-        }
-    }
-
-    /**
-     * Data streamer peer-deploy aware.
-     */
-    private class DataStreamerPda implements GridPeerDeployAware {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Deploy class. */
-        private Class<?> cls;
-
-        /** Class loader. */
-        private ClassLoader ldr;
-
-        /** Collection of objects to detect deploy class and class loader. */
-        private Collection<Object> objs;
-
-        /**
-         * Constructs data streamer peer-deploy aware.
-         *
-         * @param objs Collection of objects to detect deploy class and class loader.
-         */
-        private DataStreamerPda(Object... objs) {
-            this.objs = Arrays.asList(objs);
-        }
-
-        /** {@inheritDoc} */
-        @Override public Class<?> deployClass() {
-            if (cls == null) {
-                Class<?> cls0 = null;
-
-                if (depCls != null)
-                    cls0 = depCls;
-                else {
-                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
-                        Object o = it.next();
-
-                        if (o != null)
-                            cls0 = U.detectClass(o);
-                    }
-
-                    if (cls0 == null || U.isJdk(cls0))
-                        cls0 = IgniteDataStreamerImpl.class;
-                }
-
-                assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
-
-                cls = cls0;
-            }
-
-            return cls;
-        }
-
-        /** {@inheritDoc} */
-        @Override public ClassLoader classLoader() {
-            if (ldr == null) {
-                ClassLoader ldr0 = deployClass().getClassLoader();
-
-                // Safety.
-                if (ldr0 == null)
-                    ldr0 = U.gridClassLoader();
-
-                assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
-
-                ldr = ldr0;
-            }
-
-            return ldr;
-        }
-    }
-
-    /**
-     * Entry.
-     */
-    private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private K key;
-
-        /** */
-        private V val;
-
-        /**
-         * @param key Key.
-         * @param val Value.
-         */
-        private Entry0(K key, @Nullable V val) {
-            assert key != null;
-
-            this.key = key;
-            this.val = val;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        @SuppressWarnings("UnusedDeclaration")
-        public Entry0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public K getKey() {
-            return key;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V getValue() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public V setValue(V val) {
-            V old = this.val;
-
-            this.val = val;
-
-            return old;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(key);
-            out.writeObject(val);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            key = (K)in.readObject();
-            val = (V)in.readObject();
-        }
-    }
-
-    /**
-     * Wrapper list with special compact serialization of map entries.
-     */
-    private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /**  Wrapped delegate. */
-        private Collection<Map.Entry<K, V>> delegate;
-
-        /** Optional portable processor for converting values. */
-        private GridPortableProcessor portable;
-
-        /**
-         * @param delegate Delegate.
-         * @param portable Portable processor.
-         */
-        private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
-            this.delegate = delegate;
-            this.portable = portable;
-        }
-
-        /**
-         * For {@link Externalizable}.
-         */
-        public Entries0() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public Iterator<Entry<K, V>> iterator() {
-            return delegate.iterator();
-        }
-
-        /** {@inheritDoc} */
-        @Override public int size() {
-            return delegate.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(delegate.size());
-
-            boolean portableEnabled = portable != null;
-
-            for (Map.Entry<K, V> entry : delegate) {
-                if (portableEnabled) {
-                    out.writeObject(portable.marshalToPortable(entry.getKey()));
-                    out.writeObject(portable.marshalToPortable(entry.getValue()));
-                }
-                else {
-                    out.writeObject(entry.getKey());
-                    out.writeObject(entry.getValue());
-                }
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            int sz = in.readInt();
-
-            delegate = new ArrayList<>(sz);
-
-            for (int i = 0; i < sz; i++) {
-                Object k = in.readObject();
-                Object v = in.readObject();
-
-                delegate.add(new Entry0<>((K)k, (V)v));
-            }
-        }
-    }
-
-    /**
-     * Isolated updater which only loads entry initial value.
-     */
-    private static class IsolatedUpdater<K, V> implements Updater<K, V> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
-            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
-
-            GridCacheAdapter<K, V> internalCache = proxy.context().cache();
-
-            if (internalCache.isNear())
-                internalCache = internalCache.context().near().dht();
-
-            GridCacheContext<K, V> cctx = internalCache.context();
-
-            long topVer = cctx.affinity().affinityTopologyVersion();
-
-            GridCacheVersion ver = cctx.versions().next(topVer);
-
-            boolean portable = cctx.portableEnabled();
-
-            for (Map.Entry<K, V> e : entries) {
-                try {
-                    K key = e.getKey();
-                    V val = e.getValue();
-
-                    if (portable) {
-                        key = (K)cctx.marshalToPortable(key);
-                        val = (V)cctx.marshalToPortable(val);
-                    }
-
-                    GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
-
-                    entry.unswap(true, false);
-
-                    entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
-                        GridDrType.DR_LOAD);
-
-                    cctx.evicts().touch(entry, topVer);
-                }
-                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
-                    // No-op.
-                }
-                catch (IgniteCheckedException ex) {
-                    IgniteLogger log = cache.unwrap(Ignite.class).log();
-
-                    U.error(log, "Failed to set initial value for cache entry: " + e, ex);
-                }
-            }
-        }
-    }
-}


[3/5] incubator-ignite git commit: # gg-9869

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
new file mode 100644
index 0000000..9ac3ec8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
@@ -0,0 +1,1453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.internal.processors.portable.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.Map.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Data streamer implementation.
+ */
+@SuppressWarnings("unchecked")
+public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+    /** Isolated updater. */
+    private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
+    /** Cache updater. */
+    private Updater<K, V> updater = ISOLATED_UPDATER;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Max remap count before issuing an error. */
+    private static final int DFLT_MAX_REMAP_CNT = 32;
+
+    /** Log reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Cache name ({@code null} for default cache). */
+    private final String cacheName;
+
+    /** Portable enabled flag. */
+    private final boolean portableEnabled;
+
+    /**
+     *  If {@code true} then data will be transferred in compact format (only keys and values).
+     *  Otherwise full map entry will be transferred (this is requires by DR internal logic).
+     */
+    private final boolean compact;
+
+    /** Per-node buffer size. */
+    @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+    private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+    /** */
+    private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+    /** */
+    private long autoFlushFreq;
+
+    /** Mapping. */
+    @GridToStringInclude
+    private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Discovery listener. */
+    private final GridLocalEventListener discoLsnr;
+
+    /** Context. */
+    private final GridKernalContext ctx;
+
+    /** Communication topic for responses. */
+    private final Object topic;
+
+    /** */
+    private byte[] topicBytes;
+
+    /** {@code True} if data streamer has been cancelled. */
+    private volatile boolean cancelled;
+
+    /** Active futures of this data streamer. */
+    @GridToStringInclude
+    private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
+
+    /** Closure to remove from active futures. */
+    @GridToStringExclude
+    private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
+        @Override public void apply(IgniteInternalFuture<?> t) {
+            boolean rmv = activeFuts.remove(t);
+
+            assert rmv;
+        }
+    };
+
+    /** Job peer deploy aware. */
+    private volatile GridPeerDeployAware jobPda;
+
+    /** Deployment class. */
+    private Class<?> depCls;
+
+    /** Future to track loading finish. */
+    private final GridFutureAdapter<?> fut;
+
+    /** Public API future to track loading finish. */
+    private final IgniteFuture<?> publicFut;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Closed flag. */
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /** */
+    private volatile long lastFlushTime = U.currentTimeMillis();
+
+    /** */
+    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+
+    /**
+     * @param ctx Grid kernal context.
+     * @param cacheName Cache name.
+     * @param flushQ Flush queue.
+     * @param compact If {@code true} data is transferred in compact mode (only keys and values).
+     *                Otherwise full map entry will be transferred (this is required by DR internal logic).
+     */
+    public IgniteDataStreamerImpl(
+        final GridKernalContext ctx,
+        @Nullable final String cacheName,
+        DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
+        boolean compact
+    ) {
+        assert ctx != null;
+
+        this.ctx = ctx;
+        this.cacheName = cacheName;
+        this.flushQ = flushQ;
+        this.compact = compact;
+
+        log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
+
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+        if (node == null)
+            throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+
+        portableEnabled = ctx.portable().portableEnabled(node, cacheName);
+
+        discoLsnr = new GridLocalEventListener() {
+            @Override public void onEvent(Event evt) {
+                assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+                UUID id = discoEvt.eventNode().id();
+
+                // Remap regular mappings.
+                final Buffer buf = bufMappings.remove(id);
+
+                if (buf != null) {
+                    // Only async notification is possible since
+                    // discovery thread may be trapped otherwise.
+                    ctx.closure().callLocalSafe(
+                        new Callable<Object>() {
+                            @Override public Object call() throws Exception {
+                                buf.onNodeLeft();
+
+                                return null;
+                            }
+                        },
+                        true /* system pool */
+                    );
+                }
+            }
+        };
+
+        ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+        // Generate unique topic for this loader.
+        topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+        ctx.io().addMessageListener(topic, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof GridDataLoadResponse;
+
+                GridDataLoadResponse res = (GridDataLoadResponse)msg;
+
+                if (log.isDebugEnabled())
+                    log.debug("Received data load response: " + res);
+
+                Buffer buf = bufMappings.get(nodeId);
+
+                if (buf != null)
+                    buf.onResponse(res);
+
+                else if (log.isDebugEnabled())
+                    log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
+            }
+        });
+
+        if (log.isDebugEnabled())
+            log.debug("Added response listener within topic: " + topic);
+
+        fut = new IgniteDataStreamerFuture(ctx, this);
+
+        publicFut = new IgniteFutureImpl<>(fut);
+    }
+
+    /**
+     * Enters busy lock.
+     */
+    private void enterBusy() {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Data streamer has been closed.");
+    }
+
+    /**
+     * Leaves busy lock.
+     */
+    private void leaveBusy() {
+        busyLock.leaveBusy();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> future() {
+        return publicFut;
+    }
+
+    /**
+     * @return Internal future.
+     */
+    public IgniteInternalFuture<?> internalFuture() {
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void deployClass(Class<?> depCls) {
+        this.depCls = depCls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void updater(Updater<K, V> updater) {
+        A.notNull(updater, "updater");
+
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean allowOverwrite() {
+        return updater != ISOLATED_UPDATER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void allowOverwrite(boolean allow) {
+        if (allow == allowOverwrite())
+            return;
+
+        ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+        if (node == null)
+            throw new IgniteException("Failed to get node for cache: " + cacheName);
+
+        updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipStore() {
+        return skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void skipStore(boolean skipStore) {
+        this.skipStore = skipStore;
+    }
+
+    /** {@inheritDoc} */
+    @Override @Nullable public String cacheName() {
+        return cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeBufferSize() {
+        return bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeBufferSize(int bufSize) {
+        A.ensure(bufSize > 0, "bufSize > 0");
+
+        this.bufSize = bufSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int perNodeParallelLoadOperations() {
+        return parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void perNodeParallelLoadOperations(int parallelOps) {
+        this.parallelOps = parallelOps;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long autoFlushFrequency() {
+        return autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void autoFlushFrequency(long autoFlushFreq) {
+        A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+        long old = this.autoFlushFreq;
+
+        if (autoFlushFreq != old) {
+            this.autoFlushFreq = autoFlushFreq;
+
+            if (autoFlushFreq != 0 && old == 0)
+                flushQ.add(this);
+            else if (autoFlushFreq == 0)
+                flushQ.remove(this);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+        A.notNull(entries, "entries");
+
+        return addData(entries.entrySet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
+        A.notEmpty(entries, "entries");
+
+        enterBusy();
+
+        try {
+            GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
+
+            resFut.listenAsync(rmvActiveFut);
+
+            activeFuts.add(resFut);
+
+            Collection<K> keys = null;
+
+            if (entries.size() > 1) {
+                keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+
+                for (Map.Entry<K, V> entry : entries)
+                    keys.add(entry.getKey());
+            }
+
+            load0(entries, resFut, keys, 0);
+
+            return new IgniteFutureImpl<>(resFut);
+        }
+        catch (IgniteException e) {
+            return new IgniteFinishedFutureImpl<>(ctx, e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+        A.notNull(entry, "entry");
+
+        return addData(F.asList(entry));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> addData(K key, V val) {
+        A.notNull(key, "key");
+
+        return addData(new Entry0<>(key, val));
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<?> removeData(K key) {
+        return addData(key, null);
+    }
+
+    /**
+     * @param entries Entries.
+     * @param resFut Result future.
+     * @param activeKeys Active keys.
+     * @param remaps Remaps count.
+     */
+    private void load0(
+        Collection<? extends Map.Entry<K, V>> entries,
+        final GridFutureAdapter<Object> resFut,
+        @Nullable final Collection<K> activeKeys,
+        final int remaps
+    ) {
+        assert entries != null;
+
+        Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
+
+        boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+        for (Map.Entry<K, V> entry : entries) {
+            List<ClusterNode> nodes;
+
+            try {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                if (initPda) {
+                    jobPda = new DataStreamerPda(key, entry.getValue(), updater);
+
+                    initPda = false;
+                }
+
+                nodes = nodes(key);
+            }
+            catch (IgniteCheckedException e) {
+                resFut.onDone(e);
+
+                return;
+            }
+
+            if (F.isEmpty(nodes)) {
+                resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+                    "(no nodes with cache found in topology) [infos=" + entries.size() +
+                    ", cacheName=" + cacheName + ']'));
+
+                return;
+            }
+
+            for (ClusterNode node : nodes) {
+                Collection<Map.Entry<K, V>> col = mappings.get(node);
+
+                if (col == null)
+                    mappings.put(node, col = new ArrayList<>());
+
+                col.add(entry);
+            }
+        }
+
+        for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
+            final UUID nodeId = e.getKey().id();
+
+            Buffer buf = bufMappings.get(nodeId);
+
+            if (buf == null) {
+                Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+                if (old != null)
+                    buf = old;
+            }
+
+            final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
+
+            IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> t) {
+                    try {
+                        t.get();
+
+                        if (activeKeys != null) {
+                            for (Map.Entry<K, V> e : entriesForNode)
+                                activeKeys.remove(e.getKey());
+
+                            if (activeKeys.isEmpty())
+                                resFut.onDone();
+                        }
+                        else {
+                            assert entriesForNode.size() == 1;
+
+                            // That has been a single key,
+                            // so complete result future right away.
+                            resFut.onDone();
+                        }
+                    }
+                    catch (IgniteCheckedException e1) {
+                        if (log.isDebugEnabled())
+                            log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+                        if (cancelled) {
+                            resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+                                IgniteDataStreamerImpl.this, e1));
+                        }
+                        else if (remaps + 1 > maxRemapCnt) {
+                            resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+                                + remaps), e1);
+                        }
+                        else
+                            load0(entriesForNode, resFut, activeKeys, remaps + 1);
+                    }
+                }
+            };
+
+            GridFutureAdapter<?> f;
+
+            try {
+                f = buf.update(entriesForNode, lsnr);
+            }
+            catch (IgniteInterruptedCheckedException e1) {
+                resFut.onDone(e1);
+
+                return;
+            }
+
+            if (ctx.discovery().node(nodeId) == null) {
+                if (bufMappings.remove(nodeId, buf))
+                    buf.onNodeLeft();
+
+                if (f != null)
+                    f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                        "(node has left): " + nodeId));
+            }
+        }
+    }
+
+    /**
+     * @param key Key to map.
+     * @return Nodes to send requests to.
+     * @throws IgniteCheckedException If failed.
+     */
+    private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
+        GridAffinityProcessor aff = ctx.affinity();
+
+        return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
+            Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+    }
+
+    /**
+     * Performs flush.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doFlush() throws IgniteCheckedException {
+        lastFlushTime = U.currentTimeMillis();
+
+        List<IgniteInternalFuture> activeFuts0 = null;
+
+        int doneCnt = 0;
+
+        for (IgniteInternalFuture<?> f : activeFuts) {
+            if (!f.isDone()) {
+                if (activeFuts0 == null)
+                    activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
+
+                activeFuts0.add(f);
+            }
+            else {
+                f.get();
+
+                doneCnt++;
+            }
+        }
+
+        if (activeFuts0 == null || activeFuts0.isEmpty())
+            return;
+
+        while (true) {
+            Queue<IgniteInternalFuture<?>> q = null;
+
+            for (Buffer buf : bufMappings.values()) {
+                IgniteInternalFuture<?> flushFut = buf.flush();
+
+                if (flushFut != null) {
+                    if (q == null)
+                        q = new ArrayDeque<>(bufMappings.size() * 2);
+
+                    q.add(flushFut);
+                }
+            }
+
+            if (q != null) {
+                assert !q.isEmpty();
+
+                boolean err = false;
+
+                for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
+                    try {
+                        fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to flush buffer: " + e);
+
+                        err = true;
+                    }
+                }
+
+                if (err)
+                    // Remaps needed - flush buffers.
+                    continue;
+            }
+
+            doneCnt = 0;
+
+            for (int i = 0; i < activeFuts0.size(); i++) {
+                IgniteInternalFuture f = activeFuts0.get(i);
+
+                if (f == null)
+                    doneCnt++;
+                else if (f.isDone()) {
+                    f.get();
+
+                    doneCnt++;
+
+                    activeFuts0.set(i, null);
+                }
+                else
+                    break;
+            }
+
+            if (doneCnt == activeFuts0.size())
+                return;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public void flush() throws IgniteException {
+        enterBusy();
+
+        try {
+            doFlush();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * Flushes every internal buffer if buffer was flushed before passed in
+     * threshold.
+     * <p>
+     * Does not wait for result and does not fail on errors assuming that this method
+     * should be called periodically.
+     */
+    @Override public void tryFlush() throws IgniteInterruptedException {
+        if (!busyLock.enterBusy())
+            return;
+
+        try {
+            for (Buffer buf : bufMappings.values())
+                buf.flush();
+
+            lastFlushTime = U.currentTimeMillis();
+        }
+        catch (IgniteInterruptedCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @throws IgniteException If failed.
+     */
+    @Override public void close(boolean cancel) throws IgniteException {
+        try {
+            closeEx(cancel);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * @param cancel {@code True} to close with cancellation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void closeEx(boolean cancel) throws IgniteCheckedException {
+        if (!closed.compareAndSet(false, true))
+            return;
+
+        busyLock.block();
+
+        if (log.isDebugEnabled())
+            log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
+
+        IgniteCheckedException e = null;
+
+        try {
+            // Assuming that no methods are called on this loader after this method is called.
+            if (cancel) {
+                cancelled = true;
+
+                for (Buffer buf : bufMappings.values())
+                    buf.cancelAll();
+            }
+            else
+                doFlush();
+
+            ctx.event().removeLocalEventListener(discoLsnr);
+
+            ctx.io().removeMessageListener(topic);
+        }
+        catch (IgniteCheckedException e0) {
+            e = e0;
+        }
+
+        fut.onDone(null, e);
+
+        if (e != null)
+            throw e;
+    }
+
+    /**
+     * @return {@code true} If the loader is closed.
+     */
+    boolean isClosed() {
+        return fut.isDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws IgniteException {
+        close(false);
+    }
+
+    /**
+     * @return Max remap count.
+     */
+    public int maxRemapCount() {
+        return maxRemapCnt;
+    }
+
+    /**
+     * @param maxRemapCnt New max remap count.
+     */
+    public void maxRemapCount(int maxRemapCnt) {
+        this.maxRemapCnt = maxRemapCnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDataStreamerImpl.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getDelay(TimeUnit unit) {
+        return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @return Next flush time.
+     */
+    private long nextFlushTime() {
+        return lastFlushTime + autoFlushFreq;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(Delayed o) {
+        return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
+    }
+
+    /**
+     *
+     */
+    private class Buffer {
+        /** Node. */
+        private final ClusterNode node;
+
+        /** Active futures. */
+        private final Collection<IgniteInternalFuture<Object>> locFuts;
+
+        /** Buffered entries. */
+        private List<Map.Entry<K, V>> entries;
+
+        /** */
+        @GridToStringExclude
+        private GridFutureAdapter<Object> curFut;
+
+        /** Local node flag. */
+        private final boolean isLocNode;
+
+        /** ID generator. */
+        private final AtomicLong idGen = new AtomicLong();
+
+        /** Active futures. */
+        private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+        /** */
+        private final Semaphore sem;
+
+        /** Closure to signal on task finish. */
+        @GridToStringExclude
+        private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
+            @Override public void apply(IgniteInternalFuture<Object> t) {
+                signalTaskFinished(t);
+            }
+        };
+
+        /**
+         * @param node Node.
+         */
+        Buffer(ClusterNode node) {
+            assert node != null;
+
+            this.node = node;
+
+            locFuts = new GridConcurrentHashSet<>();
+            reqs = new ConcurrentHashMap8<>();
+
+            // Cache local node flag.
+            isLocNode = node.equals(ctx.discovery().localNode());
+
+            entries = newEntries();
+            curFut = new GridFutureAdapter<>(ctx);
+            curFut.listenAsync(signalC);
+
+            sem = new Semaphore(parallelOps);
+        }
+
+        /**
+         * @param newEntries Infos.
+         * @param lsnr Listener for the operation future.
+         * @throws IgniteInterruptedCheckedException If failed.
+         * @return Future for operation.
+         */
+        @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
+            IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+            List<Map.Entry<K, V>> entries0 = null;
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+
+                curFut0.listenAsync(lsnr);
+
+                for (Map.Entry<K, V> entry : newEntries)
+                    entries.add(entry);
+
+                if (entries.size() >= bufSize) {
+                    entries0 = entries;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>(ctx);
+                    curFut.listenAsync(signalC);
+                }
+            }
+
+            if (entries0 != null) {
+                submit(entries0, curFut0);
+
+                if (cancelled)
+                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this));
+            }
+
+            return curFut0;
+        }
+
+        /**
+         * @return Fresh collection with some space for outgrowth.
+         */
+        private List<Map.Entry<K, V>> newEntries() {
+            return new ArrayList<>((int)(bufSize * 1.2));
+        }
+
+        /**
+         * @return Future if any submitted.
+         *
+         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+         */
+        @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
+            List<Map.Entry<K, V>> entries0 = null;
+            GridFutureAdapter<Object> curFut0 = null;
+
+            synchronized (this) {
+                if (!entries.isEmpty()) {
+                    entries0 = entries;
+                    curFut0 = curFut;
+
+                    entries = newEntries();
+                    curFut = new GridFutureAdapter<>(ctx);
+                    curFut.listenAsync(signalC);
+                }
+            }
+
+            if (entries0 != null)
+                submit(entries0, curFut0);
+
+            // Create compound future for this flush.
+            GridCompoundFuture<Object, Object> res = null;
+
+            for (IgniteInternalFuture<Object> f : locFuts) {
+                if (res == null)
+                    res = new GridCompoundFuture<>(ctx);
+
+                res.add(f);
+            }
+
+            for (IgniteInternalFuture<Object> f : reqs.values()) {
+                if (res == null)
+                    res = new GridCompoundFuture<>(ctx);
+
+                res.add(f);
+            }
+
+            if (res != null)
+                res.markInitialized();
+
+            return res;
+        }
+
+        /**
+         * Increments active tasks count.
+         *
+         * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+         */
+        private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
+            U.acquire(sem);
+        }
+
+        /**
+         * @param f Future that finished.
+         */
+        private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+            assert f != null;
+
+            sem.release();
+        }
+
+        /**
+         * @param entries Entries to submit.
+         * @param curFut Current future.
+         * @throws IgniteInterruptedCheckedException If interrupted.
+         */
+        private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
+            throws IgniteInterruptedCheckedException {
+            assert entries != null;
+            assert !entries.isEmpty();
+            assert curFut != null;
+
+            incrementActiveTasks();
+
+            IgniteInternalFuture<Object> fut;
+
+            if (isLocNode) {
+                fut = ctx.closure().callLocalSafe(
+                    new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
+
+                locFuts.add(fut);
+
+                fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                    @Override public void apply(IgniteInternalFuture<Object> t) {
+                        try {
+                            boolean rmv = locFuts.remove(t);
+
+                            assert rmv;
+
+                            curFut.onDone(t.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            curFut.onDone(e);
+                        }
+                    }
+                });
+            }
+            else {
+                byte[] entriesBytes;
+
+                try {
+                    if (compact) {
+                        entriesBytes = ctx.config().getMarshaller()
+                            .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
+                    }
+                    else
+                        entriesBytes = ctx.config().getMarshaller().marshal(entries);
+
+                    if (updaterBytes == null) {
+                        assert updater != null;
+
+                        updaterBytes = ctx.config().getMarshaller().marshal(updater);
+                    }
+
+                    if (topicBytes == null)
+                        topicBytes = ctx.config().getMarshaller().marshal(topic);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to marshal (request will not be sent).", e);
+
+                    return;
+                }
+
+                GridDeployment dep = null;
+                GridPeerDeployAware jobPda0 = null;
+
+                if (ctx.deploy().enabled()) {
+                    try {
+                        jobPda0 = jobPda;
+
+                        assert jobPda0 != null;
+
+                        dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
+
+                        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+
+                        if (cache != null)
+                            cache.context().deploy().onEnter();
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+
+                        return;
+                    }
+
+                    if (dep == null)
+                        U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
+                }
+
+                long reqId = idGen.incrementAndGet();
+
+                fut = curFut;
+
+                reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+                GridDataLoadRequest req = new GridDataLoadRequest(
+                    reqId,
+                    topicBytes,
+                    cacheName,
+                    updaterBytes,
+                    entriesBytes,
+                    true,
+                    skipStore,
+                    dep != null ? dep.deployMode() : null,
+                    dep != null ? jobPda0.deployClass().getName() : null,
+                    dep != null ? dep.userVersion() : null,
+                    dep != null ? dep.participants() : null,
+                    dep != null ? dep.classLoaderId() : null,
+                    dep == null);
+
+                try {
+                    ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
+                }
+                catch (IgniteCheckedException e) {
+                    if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+                        ((GridFutureAdapter<Object>)fut).onDone(e);
+                    else
+                        ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
+                            "request (node has left): " + node.id()));
+                }
+            }
+        }
+
+        /**
+         *
+         */
+        void onNodeLeft() {
+            assert !isLocNode;
+            assert bufMappings.get(node.id()) != this;
+
+            if (log.isDebugEnabled())
+                log.debug("Forcibly completing futures (node has left): " + node.id());
+
+            Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
+                "(node has left): " + node.id());
+
+            for (GridFutureAdapter<Object> f : reqs.values())
+                f.onDone(e);
+
+            // Make sure to complete current future.
+            GridFutureAdapter<Object> curFut0;
+
+            synchronized (this) {
+                curFut0 = curFut;
+            }
+
+            curFut0.onDone(e);
+        }
+
+        /**
+         * @param res Response.
+         */
+        void onResponse(GridDataLoadResponse res) {
+            if (log.isDebugEnabled())
+                log.debug("Received data load response: " + res);
+
+            GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+            if (f == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Future for request has not been found: " + res.requestId());
+
+                return;
+            }
+
+            Throwable err = null;
+
+            byte[] errBytes = res.errorBytes();
+
+            if (errBytes != null) {
+                try {
+                    GridPeerDeployAware jobPda0 = jobPda;
+
+                    err = ctx.config().getMarshaller().unmarshal(
+                        errBytes,
+                        jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
+                }
+                catch (IgniteCheckedException e) {
+                    f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
+
+                    return;
+                }
+            }
+
+            f.onDone(null, err);
+
+            if (log.isDebugEnabled())
+                log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
+        }
+
+        /**
+         *
+         */
+        void cancelAll() {
+            IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this);
+
+            for (IgniteInternalFuture<?> f : locFuts) {
+                try {
+                    f.cancel();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to cancel mini-future.", e);
+                }
+            }
+
+            for (GridFutureAdapter<?> f : reqs.values())
+                f.onDone(err);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            int size;
+
+            synchronized (this) {
+                size = entries.size();
+            }
+
+            return S.toString(Buffer.class, this,
+                "entriesCnt", size,
+                "locFutsSize", locFuts.size(),
+                "reqsSize", reqs.size());
+        }
+    }
+
+    /**
+     * Data streamer peer-deploy aware.
+     */
+    private class DataStreamerPda implements GridPeerDeployAware {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Deploy class. */
+        private Class<?> cls;
+
+        /** Class loader. */
+        private ClassLoader ldr;
+
+        /** Collection of objects to detect deploy class and class loader. */
+        private Collection<Object> objs;
+
+        /**
+         * Constructs data streamer peer-deploy aware.
+         *
+         * @param objs Collection of objects to detect deploy class and class loader.
+         */
+        private DataStreamerPda(Object... objs) {
+            this.objs = Arrays.asList(objs);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Class<?> deployClass() {
+            if (cls == null) {
+                Class<?> cls0 = null;
+
+                if (depCls != null)
+                    cls0 = depCls;
+                else {
+                    for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+                        Object o = it.next();
+
+                        if (o != null)
+                            cls0 = U.detectClass(o);
+                    }
+
+                    if (cls0 == null || U.isJdk(cls0))
+                        cls0 = IgniteDataStreamerImpl.class;
+                }
+
+                assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
+
+                cls = cls0;
+            }
+
+            return cls;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClassLoader classLoader() {
+            if (ldr == null) {
+                ClassLoader ldr0 = deployClass().getClassLoader();
+
+                // Safety.
+                if (ldr0 == null)
+                    ldr0 = U.gridClassLoader();
+
+                assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
+
+                ldr = ldr0;
+            }
+
+            return ldr;
+        }
+    }
+
+    /**
+     * Entry.
+     */
+    private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private K key;
+
+        /** */
+        private V val;
+
+        /**
+         * @param key Key.
+         * @param val Value.
+         */
+        private Entry0(K key, @Nullable V val) {
+            assert key != null;
+
+            this.key = key;
+            this.val = val;
+        }
+
+        /**
+         * For {@link Externalizable}.
+         */
+        @SuppressWarnings("UnusedDeclaration")
+        public Entry0() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public K getKey() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V getValue() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public V setValue(V val) {
+            V old = this.val;
+
+            this.val = val;
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(key);
+            out.writeObject(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            key = (K)in.readObject();
+            val = (V)in.readObject();
+        }
+    }
+
+    /**
+     * Wrapper list with special compact serialization of map entries.
+     */
+    private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**  Wrapped delegate. */
+        private Collection<Map.Entry<K, V>> delegate;
+
+        /** Optional portable processor for converting values. */
+        private GridPortableProcessor portable;
+
+        /**
+         * @param delegate Delegate.
+         * @param portable Portable processor.
+         */
+        private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
+            this.delegate = delegate;
+            this.portable = portable;
+        }
+
+        /**
+         * For {@link Externalizable}.
+         */
+        public Entries0() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<Entry<K, V>> iterator() {
+            return delegate.iterator();
+        }
+
+        /** {@inheritDoc} */
+        @Override public int size() {
+            return delegate.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(delegate.size());
+
+            boolean portableEnabled = portable != null;
+
+            for (Map.Entry<K, V> entry : delegate) {
+                if (portableEnabled) {
+                    out.writeObject(portable.marshalToPortable(entry.getKey()));
+                    out.writeObject(portable.marshalToPortable(entry.getValue()));
+                }
+                else {
+                    out.writeObject(entry.getKey());
+                    out.writeObject(entry.getValue());
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            int sz = in.readInt();
+
+            delegate = new ArrayList<>(sz);
+
+            for (int i = 0; i < sz; i++) {
+                Object k = in.readObject();
+                Object v = in.readObject();
+
+                delegate.add(new Entry0<>((K)k, (V)v));
+            }
+        }
+    }
+
+    /**
+     * Isolated updater which only loads entry initial value.
+     */
+    private static class IsolatedUpdater<K, V> implements Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
+
+            GridCacheAdapter<K, V> internalCache = proxy.context().cache();
+
+            if (internalCache.isNear())
+                internalCache = internalCache.context().near().dht();
+
+            GridCacheContext<K, V> cctx = internalCache.context();
+
+            long topVer = cctx.affinity().affinityTopologyVersion();
+
+            GridCacheVersion ver = cctx.versions().next(topVer);
+
+            boolean portable = cctx.portableEnabled();
+
+            for (Map.Entry<K, V> e : entries) {
+                try {
+                    K key = e.getKey();
+                    V val = e.getValue();
+
+                    if (portable) {
+                        key = (K)cctx.marshalToPortable(key);
+                        val = (V)cctx.marshalToPortable(val);
+                    }
+
+                    GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
+
+                    entry.unswap(true, false);
+
+                    entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
+                        GridDrType.DR_LOAD);
+
+                    cctx.evicts().touch(entry, topVer);
+                }
+                catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
+                    // No-op.
+                }
+                catch (IgniteCheckedException ex) {
+                    IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+                    U.error(log, "Failed to set initial value for cache entry: " + e, ex);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
new file mode 100644
index 0000000..9dd4a8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
+    /** Loaders map (access is not supposed to be highly concurrent). */
+    private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** Flushing thread. */
+    private Thread flusher;
+
+    /** */
+    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
+
+    /** Marshaller. */
+    private final Marshaller marsh;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteDataStreamerProcessor(GridKernalContext ctx) {
+        super(ctx);
+
+        ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                assert msg instanceof GridDataLoadRequest;
+
+                processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
+            }
+        });
+
+        marsh = ctx.config().getMarshaller();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        if (ctx.config().isDaemon())
+            return;
+
+        flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
+            @Override protected void body() throws InterruptedException {
+                while (!isCancelled()) {
+                    IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
+
+                    if (!busyLock.enterBusy())
+                        return;
+
+                    try {
+                        if (ldr.isClosed())
+                            continue;
+
+                        ldr.tryFlush();
+
+                        flushQ.offer(ldr);
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
+                }
+            }
+        });
+
+        flusher.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Started data streamer processor.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (ctx.config().isDaemon())
+            return;
+
+        ctx.io().removeMessageListener(TOPIC_DATALOAD);
+
+        busyLock.block();
+
+        U.interrupt(flusher);
+        U.join(flusher, log);
+
+        for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
+            if (log.isDebugEnabled())
+                log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
+
+            try {
+                ldr.closeEx(cancel);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to close data streamer: " + ldr, e);
+            }
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Stopped data streamer processor.");
+    }
+
+    /**
+     * @param cacheName Cache name ({@code null} for default cache).
+     * @param compact {@code true} if data streamer should transfer data in compact format.
+     * @return Data streamer.
+     */
+    public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
+
+        try {
+            final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
+
+            ldrs.add(ldr);
+
+            ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
+                @Override public void apply(IgniteInternalFuture<?> f) {
+                    boolean b = ldrs.remove(ldr);
+
+                    assert b : "Loader has not been added to set: " + ldr;
+
+                    if (log.isDebugEnabled())
+                        log.debug("Loader has been completed: " + ldr);
+                }
+            });
+
+            return ldr;
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param cacheName Cache name ({@code null} for default cache).
+     * @return Data streamer.
+     */
+    public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
+        return dataStreamer(cacheName, true);
+    }
+
+    /**
+     * @param nodeId Sender ID.
+     * @param req Request.
+     */
+    private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
+        if (!busyLock.enterBusy()) {
+            if (log.isDebugEnabled())
+                log.debug("Ignoring data load request (node is stopping): " + req);
+
+            return;
+        }
+
+        try {
+            if (log.isDebugEnabled())
+                log.debug("Processing data load request: " + req);
+
+            Object topic;
+
+            try {
+                topic = marsh.unmarshal(req.responseTopicBytes(), null);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal topic from request: " + req, e);
+
+                return;
+            }
+
+            ClassLoader clsLdr;
+
+            if (req.forceLocalDeployment())
+                clsLdr = U.gridClassLoader();
+            else {
+                GridDeployment dep = ctx.deploy().getGlobalDeployment(
+                    req.deploymentMode(),
+                    req.sampleClassName(),
+                    req.sampleClassName(),
+                    req.userVersion(),
+                    nodeId,
+                    req.classLoaderId(),
+                    req.participants(),
+                    null);
+
+                if (dep == null) {
+                    sendResponse(nodeId,
+                        topic,
+                        req.requestId(),
+                        new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
+                            ", req=" + req + ']'),
+                        false);
+
+                    return;
+                }
+
+                clsLdr = dep.classLoader();
+            }
+
+            Collection<Map.Entry<K, V>> col;
+            IgniteDataStreamer.Updater<K, V> updater;
+
+            try {
+                col = marsh.unmarshal(req.collectionBytes(), clsLdr);
+                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
+
+                sendResponse(nodeId, topic, req.requestId(), e, false);
+
+                return;
+            }
+
+            IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
+                log,
+                req.cacheName(),
+                col,
+                req.ignoreDeploymentOwnership(),
+                req.skipStore(),
+                updater);
+
+            Exception err = null;
+
+            try {
+                job.call();
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to finish update job.", e);
+
+                err = e;
+            }
+
+            sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param resTopic Response topic.
+     * @param reqId Request ID.
+     * @param err Error.
+     * @param forceLocDep Force local deployment.
+     */
+    private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
+        boolean forceLocDep) {
+        byte[] errBytes;
+
+        try {
+            errBytes = err != null ? marsh.marshal(err) : null;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to marshal message.", e);
+
+            return;
+        }
+
+        GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
+
+        try {
+            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            if (ctx.discovery().alive(nodeId))
+                U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+            else if (log.isDebugEnabled())
+                log.debug("Node has left the grid: " + nodeId);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        X.println(">>>");
+        X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
+        X.println(">>>   ldrsSize: " + ldrs.size());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java
new file mode 100644
index 0000000..9a2f3c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
+    /** */
+    private final GridKernalContext ctx;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Entries to put. */
+    private final Collection<Map.Entry<K, V>> col;
+
+    /** {@code True} to ignore deployment ownership. */
+    private final boolean ignoreDepOwnership;
+
+    /** */
+    private final boolean skipStore;
+
+    /** */
+    private final IgniteDataStreamer.Updater<K, V> updater;
+
+    /**
+     * @param ctx Context.
+     * @param log Log.
+     * @param cacheName Cache name.
+     * @param col Entries to put.
+     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+     * @param updater Updater.
+     */
+    IgniteDataStreamerUpdateJob(
+        GridKernalContext ctx,
+        IgniteLogger log,
+        @Nullable String cacheName,
+        Collection<Map.Entry<K, V>> col,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        IgniteDataStreamer.Updater<K, V> updater) {
+        this.ctx = ctx;
+        this.log = log;
+
+        assert col != null && !col.isEmpty();
+        assert updater != null;
+
+        this.cacheName = cacheName;
+        this.col = col;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.updater = updater;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object call() throws Exception {
+        if (log.isDebugEnabled())
+            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
+
+//        TODO IGNITE-77: restore adapter usage.
+//        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+//
+//        IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+//        if (!f.isDone())
+//            f.get();
+//
+//        if (ignoreDepOwnership)
+//            cache.context().deploy().ignoreOwnership(true);
+
+        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
+
+        if (skipStore)
+            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
+
+        if (ignoreDepOwnership)
+            cache.context().deploy().ignoreOwnership(true);
+
+        try {
+            updater.update(cache, col);
+
+            return null;
+        }
+        finally {
+            if (ignoreDepOwnership)
+                cache.context().deploy().ignoreOwnership(false);
+
+            if (log.isDebugEnabled())
+                log.debug("Update job finished on node: " + ctx.localNodeId());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html
new file mode 100644
index 0000000..1090b86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+    <!-- Package description. -->
+    Data streamer processor.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 15309bb..6c922bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
deleted file mode 100644
index 306e615..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Tests for {@code IgniteDataStreamerImpl}.
- */
-public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of keys to load via data streamer. */
-    private static final int KEYS_COUNT = 1000;
-
-    /** Started grid counter. */
-    private static int cnt;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        // Forth node goes without cache.
-        if (cnt < 4)
-            cfg.setCacheConfiguration(cacheConfiguration());
-
-        cnt++;
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
-        try {
-            startGrids(5);
-
-            final CyclicBarrier barrier = new CyclicBarrier(2);
-
-            multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    U.awaitQuiet(barrier);
-
-                    G.stopAll(true);
-
-                    return null;
-                }
-            }, 1);
-
-            Ignite g4 = grid(4);
-
-            IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
-
-            dataLdr.perNodeBufferSize(32);
-
-            for (int i = 0; i < 100000; i += 2) {
-                dataLdr.addData(i, i);
-                dataLdr.removeData(i + 1);
-            }
-
-            U.awaitQuiet(barrier);
-
-            info("Closing data streamer.");
-
-            try {
-                dataLdr.close(true);
-            }
-            catch (IllegalStateException ignore) {
-                // This is ok to ignore this exception as test is racy by it's nature -
-                // grid is stopping in different thread.
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Data streamer should correctly load entries from HashMap in case of grids with more than one node
-     *  and with GridOptimizedMarshaller that requires serializable.
-     *
-     * @throws Exception If failed.
-     */
-    public void testAddDataFromMap() throws Exception {
-        try {
-            cnt = 0;
-
-            startGrids(2);
-
-            Ignite g0 = grid(0);
-
-            Marshaller marsh = g0.configuration().getMarshaller();
-
-            if (marsh instanceof OptimizedMarshaller)
-                assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable());
-            else
-                fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
-
-            IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
-
-            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
-
-            for (int i = 0; i < KEYS_COUNT; i ++)
-                map.put(i, String.valueOf(i));
-
-            dataLdr.addData(map);
-
-            dataLdr.close();
-
-            Random rnd = new Random();
-
-            IgniteCache<Integer, String> c = g0.jcache(null);
-
-            for (int i = 0; i < KEYS_COUNT; i ++) {
-                Integer k = rnd.nextInt(KEYS_COUNT);
-
-                String v = c.get(k);
-
-                assertEquals(k.toString(), v);
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Gets cache configuration.
-     *
-     * @return Cache configuration.
-     */
-    private CacheConfiguration cacheConfiguration() {
-        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-        cacheCfg.setCacheMode(PARTITIONED);
-        cacheCfg.setBackups(1);
-        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        return cacheCfg;
-    }
-
-    /**
-     *
-     */
-    private static class TestObject implements Serializable {
-        /** */
-        private int val;
-
-        /**
-         */
-        private TestObject() {
-            // No-op.
-        }
-
-        /**
-         * @param val Value.
-         */
-        private TestObject(int val) {
-            this.val = val;
-        }
-
-        public Integer val() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj instanceof TestObject && ((TestObject)obj).val == val;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
deleted file mode 100644
index 22a1f97..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jdk8.backport.*;
-
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Data streamer performance test. Compares group lock data streamer to traditional lock.
- * <p>
- * Disable assertions and give at least 2 GB heap to run this test.
- */
-public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int GRID_CNT = 3;
-
-    /** */
-    private static final int ENTRY_CNT = 80000;
-
-    /** */
-    private boolean useCache;
-
-    /** */
-    private String[] vals = new String[2048];
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(spi);
-
-        cfg.setIncludeProperties();
-
-        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-
-        cfg.setConnectorConfiguration(null);
-
-        cfg.setPeerClassLoadingEnabled(true);
-
-        if (useCache) {
-            CacheConfiguration cc = defaultCacheConfiguration();
-
-            cc.setCacheMode(PARTITIONED);
-
-            cc.setDistributionMode(PARTITIONED_ONLY);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-            cc.setStartSize(ENTRY_CNT / GRID_CNT);
-            cc.setSwapEnabled(false);
-
-            cc.setBackups(1);
-
-            cc.setStoreValueBytes(true);
-
-            cfg.setCacheSanityCheckEnabled(false);
-            cfg.setCacheConfiguration(cc);
-        }
-        else
-            cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        for (int i = 0; i < vals.length; i++) {
-            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
-
-            StringBuilder sb = new StringBuilder();
-
-            for (int j = 0; j < valLen; j++)
-                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
-
-            vals[i] = sb.toString();
-
-            info("Value: " + vals[i]);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPerformance() throws Exception {
-        doTest();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void doTest() throws Exception {
-        System.gc();
-        System.gc();
-        System.gc();
-
-        try {
-            useCache = true;
-
-            startGridsMultiThreaded(GRID_CNT);
-
-            useCache = false;
-
-            Ignite ignite = startGrid();
-
-            final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
-
-            ldr.perNodeBufferSize(8192);
-            ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted());
-            ldr.autoFlushFrequency(0);
-
-            final LongAdder cnt = new LongAdder();
-
-            long start = U.currentTimeMillis();
-
-            Thread t = new Thread(new Runnable() {
-                @SuppressWarnings("BusyWait")
-                @Override public void run() {
-                    while (true) {
-                        try {
-                            Thread.sleep(10000);
-                        }
-                        catch (InterruptedException ignored) {
-                            break;
-                        }
-
-                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
-                    }
-                }
-            });
-
-            t.setDaemon(true);
-
-            t.start();
-
-            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
-
-            multithreaded(new Callable<Object>() {
-                @SuppressWarnings("InfiniteLoopStatement")
-                @Override public Object call() throws Exception {
-                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
-
-                    while (true) {
-                        int i = rnd.nextInt(ENTRY_CNT);
-
-                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
-
-                        cnt.increment();
-                    }
-                }
-            }, threadNum, "loader");
-
-            info("Closing loader...");
-
-            ldr.close(false);
-
-            long duration = U.currentTimeMillis() - start;
-
-            info("Finished performance test. Duration: " + duration + "ms.");
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-}


[4/5] incubator-ignite git commit: # gg-9869

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
deleted file mode 100644
index 7db41e6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- *
- */
-public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
-    /** Loaders map (access is not supposed to be highly concurrent). */
-    private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** Flushing thread. */
-    private Thread flusher;
-
-    /** */
-    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
-
-    /** Marshaller. */
-    private final Marshaller marsh;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public IgniteDataStreamerProcessor(GridKernalContext ctx) {
-        super(ctx);
-
-        ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof GridDataLoadRequest;
-
-                processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
-            }
-        });
-
-        marsh = ctx.config().getMarshaller();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
-
-        flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
-            @Override protected void body() throws InterruptedException {
-                while (!isCancelled()) {
-                    IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
-
-                    if (!busyLock.enterBusy())
-                        return;
-
-                    try {
-                        if (ldr.isClosed())
-                            continue;
-
-                        ldr.tryFlush();
-
-                        flushQ.offer(ldr);
-                    }
-                    finally {
-                        busyLock.leaveBusy();
-                    }
-                }
-            }
-        });
-
-        flusher.start();
-
-        if (log.isDebugEnabled())
-            log.debug("Started data streamer processor.");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        if (ctx.config().isDaemon())
-            return;
-
-        ctx.io().removeMessageListener(TOPIC_DATALOAD);
-
-        busyLock.block();
-
-        U.interrupt(flusher);
-        U.join(flusher, log);
-
-        for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
-            if (log.isDebugEnabled())
-                log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
-
-            try {
-                ldr.closeEx(cancel);
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to close data streamer: " + ldr, e);
-            }
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Stopped data streamer processor.");
-    }
-
-    /**
-     * @param cacheName Cache name ({@code null} for default cache).
-     * @param compact {@code true} if data streamer should transfer data in compact format.
-     * @return Data streamer.
-     */
-    public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
-
-        try {
-            final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
-
-            ldrs.add(ldr);
-
-            ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> f) {
-                    boolean b = ldrs.remove(ldr);
-
-                    assert b : "Loader has not been added to set: " + ldr;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Loader has been completed: " + ldr);
-                }
-            });
-
-            return ldr;
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * @param cacheName Cache name ({@code null} for default cache).
-     * @return Data streamer.
-     */
-    public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
-        return dataStreamer(cacheName, true);
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param req Request.
-     */
-    private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
-        if (!busyLock.enterBusy()) {
-            if (log.isDebugEnabled())
-                log.debug("Ignoring data load request (node is stopping): " + req);
-
-            return;
-        }
-
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Processing data load request: " + req);
-
-            Object topic;
-
-            try {
-                topic = marsh.unmarshal(req.responseTopicBytes(), null);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal topic from request: " + req, e);
-
-                return;
-            }
-
-            ClassLoader clsLdr;
-
-            if (req.forceLocalDeployment())
-                clsLdr = U.gridClassLoader();
-            else {
-                GridDeployment dep = ctx.deploy().getGlobalDeployment(
-                    req.deploymentMode(),
-                    req.sampleClassName(),
-                    req.sampleClassName(),
-                    req.userVersion(),
-                    nodeId,
-                    req.classLoaderId(),
-                    req.participants(),
-                    null);
-
-                if (dep == null) {
-                    sendResponse(nodeId,
-                        topic,
-                        req.requestId(),
-                        new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
-                            ", req=" + req + ']'),
-                        false);
-
-                    return;
-                }
-
-                clsLdr = dep.classLoader();
-            }
-
-            Collection<Map.Entry<K, V>> col;
-            IgniteDataStreamer.Updater<K, V> updater;
-
-            try {
-                col = marsh.unmarshal(req.collectionBytes(), clsLdr);
-                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
-
-                sendResponse(nodeId, topic, req.requestId(), e, false);
-
-                return;
-            }
-
-            IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
-                log,
-                req.cacheName(),
-                col,
-                req.ignoreDeploymentOwnership(),
-                req.skipStore(),
-                updater);
-
-            Exception err = null;
-
-            try {
-                job.call();
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to finish update job.", e);
-
-                err = e;
-            }
-
-            sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param resTopic Response topic.
-     * @param reqId Request ID.
-     * @param err Error.
-     * @param forceLocDep Force local deployment.
-     */
-    private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
-        boolean forceLocDep) {
-        byte[] errBytes;
-
-        try {
-            errBytes = err != null ? marsh.marshal(err) : null;
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to marshal message.", e);
-
-            return;
-        }
-
-        GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
-
-        try {
-            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
-        }
-        catch (IgniteCheckedException e) {
-            if (ctx.discovery().alive(nodeId))
-                U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
-            else if (log.isDebugEnabled())
-                log.debug("Node has left the grid: " + nodeId);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void printMemoryStats() {
-        X.println(">>>");
-        X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
-        X.println(">>>   ldrsSize: " + ldrs.size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
deleted file mode 100644
index 1a3db40..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
-    /** */
-    private final GridKernalContext ctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** Cache name. */
-    private final String cacheName;
-
-    /** Entries to put. */
-    private final Collection<Map.Entry<K, V>> col;
-
-    /** {@code True} to ignore deployment ownership. */
-    private final boolean ignoreDepOwnership;
-
-    /** */
-    private final boolean skipStore;
-
-    /** */
-    private final IgniteDataStreamer.Updater<K, V> updater;
-
-    /**
-     * @param ctx Context.
-     * @param log Log.
-     * @param cacheName Cache name.
-     * @param col Entries to put.
-     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
-     * @param updater Updater.
-     */
-    IgniteDataStreamerUpdateJob(
-        GridKernalContext ctx,
-        IgniteLogger log,
-        @Nullable String cacheName,
-        Collection<Map.Entry<K, V>> col,
-        boolean ignoreDepOwnership,
-        boolean skipStore,
-        IgniteDataStreamer.Updater<K, V> updater) {
-        this.ctx = ctx;
-        this.log = log;
-
-        assert col != null && !col.isEmpty();
-        assert updater != null;
-
-        this.cacheName = cacheName;
-        this.col = col;
-        this.ignoreDepOwnership = ignoreDepOwnership;
-        this.skipStore = skipStore;
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object call() throws Exception {
-        if (log.isDebugEnabled())
-            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
-
-//        TODO IGNITE-77: restore adapter usage.
-//        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-//
-//        IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-//        if (!f.isDone())
-//            f.get();
-//
-//        if (ignoreDepOwnership)
-//            cache.context().deploy().ignoreOwnership(true);
-
-        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
-        if (skipStore)
-            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
-        if (ignoreDepOwnership)
-            cache.context().deploy().ignoreOwnership(true);
-
-        try {
-            updater.update(cache, col);
-
-            return null;
-        }
-        finally {
-            if (ignoreDepOwnership)
-                cache.context().deploy().ignoreOwnership(false);
-
-            if (log.isDebugEnabled())
-                log.debug("Update job finished on node: " + ctx.localNodeId());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
deleted file mode 100644
index 1090b86..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Data streamer processor.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
new file mode 100644
index 0000000..d77b52e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridDataLoadRequest implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] resTopicBytes;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Entries to put. */
+    private byte[] colBytes;
+
+    /** {@code True} to ignore deployment ownership. */
+    private boolean ignoreDepOwnership;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private DeploymentMode depMode;
+
+    /** */
+    private String sampleClsName;
+
+    /** */
+    private String userVer;
+
+    /** Node class loader participants. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+    private Map<UUID, IgniteUuid> ldrParticipants;
+
+    /** */
+    private IgniteUuid clsLdrId;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public GridDataLoadRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param resTopicBytes Response topic.
+     * @param cacheName Cache name.
+     * @param updaterBytes Cache updater.
+     * @param colBytes Collection bytes.
+     * @param ignoreDepOwnership Ignore ownership.
+     * @param skipStore Skip store flag.
+     * @param depMode Deployment mode.
+     * @param sampleClsName Sample class name.
+     * @param userVer User version.
+     * @param ldrParticipants Loader participants.
+     * @param clsLdrId Class loader ID.
+     * @param forceLocDep Force local deployment.
+     */
+    public GridDataLoadRequest(long reqId,
+        byte[] resTopicBytes,
+        @Nullable String cacheName,
+        byte[] updaterBytes,
+        byte[] colBytes,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        DeploymentMode depMode,
+        String sampleClsName,
+        String userVer,
+        Map<UUID, IgniteUuid> ldrParticipants,
+        IgniteUuid clsLdrId,
+        boolean forceLocDep) {
+        this.reqId = reqId;
+        this.resTopicBytes = resTopicBytes;
+        this.cacheName = cacheName;
+        this.updaterBytes = updaterBytes;
+        this.colBytes = colBytes;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.depMode = depMode;
+        this.sampleClsName = sampleClsName;
+        this.userVer = userVer;
+        this.ldrParticipants = ldrParticipants;
+        this.clsLdrId = clsLdrId;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Response topic.
+     */
+    public byte[] responseTopicBytes() {
+        return resTopicBytes;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Updater.
+     */
+    public byte[] updaterBytes() {
+        return updaterBytes;
+    }
+
+    /**
+     * @return Collection bytes.
+     */
+    public byte[] collectionBytes() {
+        return colBytes;
+    }
+
+    /**
+     * @return {@code True} to ignore ownership.
+     */
+    public boolean ignoreDeploymentOwnership() {
+        return ignoreDepOwnership;
+    }
+
+    /**
+     * @return Skip store flag.
+     */
+    public boolean skipStore() {
+        return skipStore;
+    }
+
+    /**
+     * @return Deployment mode.
+     */
+    public DeploymentMode deploymentMode() {
+        return depMode;
+    }
+
+    /**
+     * @return Sample class name.
+     */
+    public String sampleClassName() {
+        return sampleClsName;
+    }
+
+    /**
+     * @return User version.
+     */
+    public String userVersion() {
+        return userVer;
+    }
+
+    /**
+     * @return Participants.
+     */
+    public Map<UUID, IgniteUuid> participants() {
+        return ldrParticipants;
+    }
+
+    /**
+     * @return Class loader ID.
+     */
+    public IgniteUuid classLoaderId() {
+        return clsLdrId;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDataLoadRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeString("cacheName", cacheName))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeByteArray("colBytes", colBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeString("sampleClsName", sampleClsName))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeBoolean("skipStore", skipStore))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeString("userVer", userVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cacheName = reader.readString("cacheName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                clsLdrId = reader.readIgniteUuid("clsLdrId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                colBytes = reader.readByteArray("colBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                byte depModeOrd;
+
+                depModeOrd = reader.readByte("depMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                depMode = DeploymentMode.fromOrdinal(depModeOrd);
+
+                reader.incrementState();
+
+            case 4:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                resTopicBytes = reader.readByteArray("resTopicBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                sampleClsName = reader.readString("sampleClsName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                skipStore = reader.readBoolean("skipStore");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                updaterBytes = reader.readByteArray("updaterBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                userVer = reader.readString("userVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 62;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 13;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
new file mode 100644
index 0000000..25ff9ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+
+/**
+ *
+ */
+public class GridDataLoadResponse implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] errBytes;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * @param reqId Request ID.
+     * @param errBytes Error bytes.
+     * @param forceLocDep Force local deployment.
+     */
+    public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
+        this.reqId = reqId;
+        this.errBytes = errBytes;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public GridDataLoadResponse() {
+        // No-op.
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    public byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDataLoadResponse.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 63;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..629c7b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class IgniteDataStreamerCacheUpdaters {
+    /** */
+    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
+
+    /**
+     * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
+     * is not the best.
+     *
+     * @return Single updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+        return INDIVIDUAL;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
+     * updated concurrently. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+        return BATCHED;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
+     * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched sorted updater.
+     */
+    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
+        return BATCHED_SORTED;
+    }
+
+    /**
+     * Updates cache.
+     *
+     * @param cache Cache.
+     * @param rmvCol Keys to remove.
+     * @param putMap Entries to put.
+     * @throws IgniteException If failed.
+     */
+    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
+        Map<K, V> putMap) {
+        assert rmvCol != null || putMap != null;
+
+        // Here we assume that there are no key duplicates, so the following calls are valid.
+        if (rmvCol != null)
+            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+        if (putMap != null)
+            cache.putAll(putMap);
+    }
+
+    /**
+     * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
+     */
+    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null)
+                    cache.remove(key);
+                else
+                    cache.put(key, val);
+            }
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new HashSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new HashMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key instanceof Comparable;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new TreeSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new TreeMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
new file mode 100644
index 0000000..b6aa15c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Data streamer future.
+ */
+class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Data streamer. */
+    @GridToStringExclude
+    private IgniteDataStreamerImpl dataLdr;
+
+    /**
+     * Default constructor for {@link Externalizable} support.
+     */
+    public IgniteDataStreamerFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     * @param dataLdr Data streamer.
+     */
+    IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
+        super(ctx);
+
+        assert dataLdr != null;
+
+        this.dataLdr = dataLdr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel() throws IgniteCheckedException {
+        checkValid();
+
+        if (onCancelled()) {
+            dataLdr.closeEx(true);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
+    }
+}