You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/03 19:59:51 UTC

[2/2] incubator-ignite git commit: # ignite-394: "Data loader" -> "Data streamer" + fix names of some tests

# ignite-394: "Data loader" -> "Data streamer" + fix names of some tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/350ec49d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/350ec49d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/350ec49d

Branch: refs/heads/ignite-394
Commit: 350ec49d819eb0ba1fd75604a3665f610b1f4e69
Parents: b33ab6a
Author: Artem Shutak <as...@gridgain.com>
Authored: Tue Mar 3 22:00:13 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Tue Mar 3 22:00:13 2015 +0300

----------------------------------------------------------------------
 .../datagrid/CacheDataLoaderExample.java        |  85 --------
 .../datagrid/CacheDataStreamerExample.java      |  85 ++++++++
 .../ignite/examples/CacheExamplesSelfTest.java  |   4 +-
 .../src/main/java/org/apache/ignite/Ignite.java |   4 +-
 .../org/apache/ignite/IgniteDataStreamer.java   |   4 +-
 .../ignite/internal/GridKernalContext.java      |   2 +-
 .../dataload/IgniteDataStreamerFuture.java      |   6 +-
 .../dataload/IgniteDataStreamerImpl.java        |  12 +-
 .../dataload/IgniteDataStreamerProcessor.java   |   6 +-
 .../internal/processors/dataload/package.html   |   2 +-
 .../processors/streamer/IgniteStreamerImpl.java |   2 +-
 .../dataload/GridDataLoaderImplSelfTest.java    | 214 -------------------
 .../dataload/GridDataLoaderPerformanceTest.java | 199 -----------------
 .../IgniteDataStreamerImplSelfTest.java         | 214 +++++++++++++++++++
 .../IgniteDataStreamerPerformanceTest.java      | 199 +++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +-
 16 files changed, 520 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
deleted file mode 100644
index 8984fdd..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.datagrid;
-
-import org.apache.ignite.*;
-import org.apache.ignite.examples.*;
-
-/**
- * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API.
- * {@link IgniteDataStreamer} is a lot more efficient to use than standard
- * {@code put(...)} operation as it properly buffers cache requests
- * together and properly manages load on remote nodes.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}.
- * <p>
- * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
- * start node with {@code examples/config/example-cache.xml} configuration.
- */
-public class CacheDataLoaderExample {
-    /** Cache name. */
-    private static final String CACHE_NAME = "partitioned";
-
-    /** Number of entries to load. */
-    private static final int ENTRY_COUNT = 500000;
-
-    /** Heap size required to run this example. */
-    public static final int MIN_MEMORY = 512 * 1024 * 1024;
-
-    /**
-     * Executes example.
-     *
-     * @param args Command line arguments, none required.
-     * @throws IgniteException If example execution failed.
-     */
-    public static void main(String[] args) throws IgniteException {
-        ExamplesUtils.checkMinMemory(MIN_MEMORY);
-
-        try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) {
-            System.out.println();
-            System.out.println(">>> Cache data loader example started.");
-
-            // Clean up caches on all nodes before run.
-            ignite.jcache(CACHE_NAME).clear();
-
-            System.out.println();
-            System.out.println(">>> Cache clear finished.");
-
-            long start = System.currentTimeMillis();
-
-            try (IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(CACHE_NAME)) {
-                // Configure loader.
-                ldr.perNodeBufferSize(1024);
-                ldr.perNodeParallelLoadOperations(8);
-
-                for (int i = 0; i < ENTRY_COUNT; i++) {
-                    ldr.addData(i, Integer.toString(i));
-
-                    // Print out progress while loading cache.
-                    if (i > 0 && i % 10000 == 0)
-                        System.out.println("Loaded " + i + " keys.");
-                }
-            }
-
-            long end = System.currentTimeMillis();
-
-            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java
new file mode 100644
index 0000000..fc1ef78
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataStreamerExample.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.datagrid;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+
+/**
+ * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API.
+ * {@link IgniteDataStreamer} is a lot more efficient to use than standard
+ * {@code put(...)} operation as it properly buffers cache requests
+ * together and properly manages load on remote nodes.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}.
+ * <p>
+ * Alternatively you can run {@link CacheNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-cache.xml} configuration.
+ */
+public class CacheDataStreamerExample {
+    /** Cache name. */
+    private static final String CACHE_NAME = "partitioned";
+
+    /** Number of entries to load. */
+    private static final int ENTRY_COUNT = 500000;
+
+    /** Heap size required to run this example. */
+    public static final int MIN_MEMORY = 512 * 1024 * 1024;
+
+    /**
+     * Executes example.
+     *
+     * @param args Command line arguments, none required.
+     * @throws IgniteException If example execution failed.
+     */
+    public static void main(String[] args) throws IgniteException {
+        ExamplesUtils.checkMinMemory(MIN_MEMORY);
+
+        try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) {
+            System.out.println();
+            System.out.println(">>> Cache data loader example started.");
+
+            // Clean up caches on all nodes before run.
+            ignite.jcache(CACHE_NAME).clear();
+
+            System.out.println();
+            System.out.println(">>> Cache clear finished.");
+
+            long start = System.currentTimeMillis();
+
+            try (IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(CACHE_NAME)) {
+                // Configure loader.
+                ldr.perNodeBufferSize(1024);
+                ldr.perNodeParallelLoadOperations(8);
+
+                for (int i = 0; i < ENTRY_COUNT; i++) {
+                    ldr.addData(i, Integer.toString(i));
+
+                    // Print out progress while loading cache.
+                    if (i > 0 && i % 10000 == 0)
+                        System.out.println("Loaded " + i + " keys.");
+                }
+            }
+
+            long end = System.currentTimeMillis();
+
+            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index bd82760..c5c4599 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.examples;
 
 import org.apache.ignite.examples.datagrid.*;
-import org.apache.ignite.examples.datastructures.*;
 import org.apache.ignite.examples.datagrid.starschema.*;
 import org.apache.ignite.examples.datagrid.store.*;
+import org.apache.ignite.examples.datastructures.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 /**
@@ -115,7 +115,7 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
      * @throws Exception If failed.
      */
     public void testCacheDataLoaderExample() throws Exception {
-        CacheDataLoaderExample.main(EMPTY_ARGS);
+        CacheDataStreamerExample.main(EMPTY_ARGS);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 343d65d..f13def0 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -203,12 +203,12 @@ public interface Ignite extends AutoCloseable {
     public IgniteTransactions transactions();
 
     /**
-     * Gets a new instance of data loader associated with given cache name. Data loader
+     * Gets a new instance of data loader associated with given cache name. Data streamer
      * is responsible for loading external data into in-memory data grid. For more information
      * refer to {@link IgniteDataStreamer} documentation.
      *
      * @param cacheName Cache name ({@code null} for default cache).
-     * @return Data loader.
+     * @return Data streamer.
      */
     public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 519a0a0..ca9726d 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -24,7 +24,7 @@ import java.io.*;
 import java.util.*;
 
 /**
- * Data loader is responsible for loading external data into cache. It achieves it by
+ * Data streamer is responsible for loading external data into cache. It achieves it by
  * properly buffering updates and properly mapping keys to nodes responsible for the data
  * to make sure that there is the least amount of data movement possible and optimal
  * network and memory utilization.
@@ -363,7 +363,7 @@ public interface IgniteDataStreamer<K, V> extends AutoCloseable {
      * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
      * performance custom user-defined implementation may help.
      * <p>
-     * Data loader can be configured to use custom implementation of updater instead of default one using
+     * Data streamer can be configured to use custom implementation of updater instead of default one using
      * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method.
      */
     interface Updater<K, V> extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 53bd9d2..6d502bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -245,7 +245,7 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     /**
      * Gets data loader processor.
      *
-     * @return Data loader processor.
+     * @return Data streamer processor.
      */
     public <K, V> IgniteDataStreamerProcessor<K, V> dataStream();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
index e093b37..5730655 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
@@ -26,13 +26,13 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import java.io.*;
 
 /**
- * Data loader future.
+ * Data streamer future.
  */
 class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Data loader. */
+    /** Data streamer. */
     @GridToStringExclude
     private IgniteDataStreamerImpl dataLdr;
 
@@ -45,7 +45,7 @@ class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
 
     /**
      * @param ctx Context.
-     * @param dataLdr Data loader.
+     * @param dataLdr Data streamer.
      */
     IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
         super(ctx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
index 0476e2c..2e7517b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
@@ -52,7 +52,7 @@ import static org.apache.ignite.internal.GridTopic.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
- * Data loader implementation.
+ * Data streamer implementation.
  */
 @SuppressWarnings("unchecked")
 public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
@@ -253,7 +253,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
      */
     private void enterBusy() {
         if (!busyLock.enterBusy())
-            throw new IllegalStateException("Data loader has been closed.");
+            throw new IllegalStateException("Data streamer has been closed.");
     }
 
     /**
@@ -520,7 +520,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
                             log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
 
                         if (cancelled) {
-                            resFut.onDone(new IgniteCheckedException("Data loader has been cancelled: " +
+                            resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
                                 IgniteDataStreamerImpl.this, e1));
                         }
                         else if (remaps + 1 > maxRemapCnt) {
@@ -887,7 +887,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
                 submit(entries0, curFut0);
 
                 if (cancelled)
-                    curFut0.onDone(new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this));
+                    curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this));
             }
 
             return curFut0;
@@ -1160,7 +1160,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
          *
          */
         void cancelAll() {
-            IgniteCheckedException err = new IgniteCheckedException("Data loader has been cancelled: " + IgniteDataStreamerImpl.this);
+            IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this);
 
             for (IgniteInternalFuture<?> f : locFuts) {
                 try {
@@ -1191,7 +1191,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
     }
 
     /**
-     * Data loader peer-deploy aware.
+     * Data streamer peer-deploy aware.
      */
     private class DataLoaderPda implements GridPeerDeployAware {
         /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
index 3b25d17..c01d451 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
@@ -140,7 +140,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
     /**
      * @param cacheName Cache name ({@code null} for default cache).
      * @param compact {@code true} if data loader should transfer data in compact format.
-     * @return Data loader.
+     * @return Data streamer.
      */
     public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
         if (!busyLock.enterBusy())
@@ -171,7 +171,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
 
     /**
      * @param cacheName Cache name ({@code null} for default cache).
-     * @return Data loader.
+     * @return Data streamer.
      */
     public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
         return dataStreamer(cacheName, true);
@@ -310,7 +310,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
     /** {@inheritDoc} */
     @Override public void printMemoryStats() {
         X.println(">>>");
-        X.println(">>> Data loader processor memory stats [grid=" + ctx.gridName() + ']');
+        X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
         X.println(">>>   ldrsSize: " + ldrs.size());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
index 47052a3..1090b86 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
@@ -19,6 +19,6 @@
 <html>
 <body>
     <!-- Package description. -->
-    Data loader processor.
+    Data streamer processor.
 </body>
 </html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
index 10fd3d8..c70f8e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/IgniteStreamerImpl.java
@@ -1139,7 +1139,7 @@ public class IgniteStreamerImpl implements IgniteStreamerEx, Externalizable {
     }
 
     /**
-     * Data loader peer-deploy aware.
+     * Data streamer peer-deploy aware.
      */
     private class StreamerPda implements GridPeerDeployAware {
         /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
deleted file mode 100644
index 2d1d79d..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderImplSelfTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Tests for {@code GridDataLoaderImpl}.
- */
-public class GridDataLoaderImplSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Number of keys to load via data loader. */
-    private static final int KEYS_COUNT = 1000;
-
-    /** Started grid counter. */
-    private static int cnt;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        // Forth node goes without cache.
-        if (cnt < 4)
-            cfg.setCacheConfiguration(cacheConfiguration());
-
-        cnt++;
-
-        return cfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testNullPointerExceptionUponDataLoaderClosing() throws Exception {
-        try {
-            startGrids(5);
-
-            final CyclicBarrier barrier = new CyclicBarrier(2);
-
-            multithreadedAsync(new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    U.awaitQuiet(barrier);
-
-                    G.stopAll(true);
-
-                    return null;
-                }
-            }, 1);
-
-            Ignite g4 = grid(4);
-
-            IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
-
-            dataLdr.perNodeBufferSize(32);
-
-            for (int i = 0; i < 100000; i += 2) {
-                dataLdr.addData(i, i);
-                dataLdr.removeData(i + 1);
-            }
-
-            U.awaitQuiet(barrier);
-
-            info("Closing data loader.");
-
-            try {
-                dataLdr.close(true);
-            }
-            catch (IllegalStateException ignore) {
-                // This is ok to ignore this exception as test is racy by it's nature -
-                // grid is stopping in different thread.
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Data loader should correctly load entries from HashMap in case of grids with more than one node
-     *  and with GridOptimizedMarshaller that requires serializable.
-     *
-     * @throws Exception If failed.
-     */
-    public void testAddDataFromMap() throws Exception {
-        try {
-            cnt = 0;
-
-            startGrids(2);
-
-            Ignite g0 = grid(0);
-
-            Marshaller marsh = g0.configuration().getMarshaller();
-
-            if (marsh instanceof OptimizedMarshaller)
-                assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable());
-            else
-                fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
-
-            IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
-
-            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
-
-            for (int i = 0; i < KEYS_COUNT; i ++)
-                map.put(i, String.valueOf(i));
-
-            dataLdr.addData(map);
-
-            dataLdr.close();
-
-            Random rnd = new Random();
-
-            IgniteCache<Integer, String> c = g0.jcache(null);
-
-            for (int i = 0; i < KEYS_COUNT; i ++) {
-                Integer k = rnd.nextInt(KEYS_COUNT);
-
-                String v = c.get(k);
-
-                assertEquals(k.toString(), v);
-            }
-        }
-        finally {
-            G.stopAll(true);
-        }
-    }
-
-    /**
-     * Gets cache configuration.
-     *
-     * @return Cache configuration.
-     */
-    private CacheConfiguration cacheConfiguration() {
-        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-        cacheCfg.setCacheMode(PARTITIONED);
-        cacheCfg.setBackups(1);
-        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-
-        return cacheCfg;
-    }
-
-    /**
-     *
-     */
-    private static class TestObject implements Serializable {
-        /** */
-        private int val;
-
-        /**
-         */
-        private TestObject() {
-            // No-op.
-        }
-
-        /**
-         * @param val Value.
-         */
-        private TestObject(int val) {
-            this.val = val;
-        }
-
-        public Integer val() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return val;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            return obj instanceof TestObject && ((TestObject)obj).val == val;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
deleted file mode 100644
index 89a2170..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderPerformanceTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jdk8.backport.*;
-
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Data loader performance test. Compares group lock data loader to traditional lock.
- * <p>
- * Disable assertions and give at least 2 GB heap to run this test.
- */
-public class GridDataLoaderPerformanceTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int GRID_CNT = 3;
-
-    /** */
-    private static final int ENTRY_CNT = 80000;
-
-    /** */
-    private boolean useCache;
-
-    /** */
-    private String[] vals = new String[2048];
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
-        spi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(spi);
-
-        cfg.setIncludeProperties();
-
-        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-
-        cfg.setConnectorConfiguration(null);
-
-        cfg.setPeerClassLoadingEnabled(true);
-
-        if (useCache) {
-            CacheConfiguration cc = defaultCacheConfiguration();
-
-            cc.setCacheMode(PARTITIONED);
-
-            cc.setDistributionMode(PARTITIONED_ONLY);
-            cc.setWriteSynchronizationMode(FULL_SYNC);
-            cc.setStartSize(ENTRY_CNT / GRID_CNT);
-            cc.setSwapEnabled(false);
-
-            cc.setBackups(1);
-
-            cc.setStoreValueBytes(true);
-
-            cfg.setCacheSanityCheckEnabled(false);
-            cfg.setCacheConfiguration(cc);
-        }
-        else
-            cfg.setCacheConfiguration();
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        super.beforeTestsStarted();
-
-        for (int i = 0; i < vals.length; i++) {
-            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
-
-            StringBuilder sb = new StringBuilder();
-
-            for (int j = 0; j < valLen; j++)
-                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
-
-            vals[i] = sb.toString();
-
-            info("Value: " + vals[i]);
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testPerformance() throws Exception {
-        doTest();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void doTest() throws Exception {
-        System.gc();
-        System.gc();
-        System.gc();
-
-        try {
-            useCache = true;
-
-            startGridsMultiThreaded(GRID_CNT);
-
-            useCache = false;
-
-            Ignite ignite = startGrid();
-
-            final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
-
-            ldr.perNodeBufferSize(8192);
-            ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());
-            ldr.autoFlushFrequency(0);
-
-            final LongAdder cnt = new LongAdder();
-
-            long start = U.currentTimeMillis();
-
-            Thread t = new Thread(new Runnable() {
-                @SuppressWarnings("BusyWait")
-                @Override public void run() {
-                    while (true) {
-                        try {
-                            Thread.sleep(10000);
-                        }
-                        catch (InterruptedException ignored) {
-                            break;
-                        }
-
-                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
-                    }
-                }
-            });
-
-            t.setDaemon(true);
-
-            t.start();
-
-            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
-
-            multithreaded(new Callable<Object>() {
-                @SuppressWarnings("InfiniteLoopStatement")
-                @Override public Object call() throws Exception {
-                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
-
-                    while (true) {
-                        int i = rnd.nextInt(ENTRY_CNT);
-
-                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
-
-                        cnt.increment();
-                    }
-                }
-            }, threadNum, "loader");
-
-            info("Closing loader...");
-
-            ldr.close(false);
-
-            long duration = U.currentTimeMillis() - start;
-
-            info("Finished performance test. Duration: " + duration + "ms.");
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
new file mode 100644
index 0000000..c84d9db
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests for {@code GridDataLoaderImpl}.
+ */
+public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Number of keys to load via data loader. */
+    private static final int KEYS_COUNT = 1000;
+
+    /** Started grid counter. */
+    private static int cnt;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        // Forth node goes without cache.
+        if (cnt < 4)
+            cfg.setCacheConfiguration(cacheConfiguration());
+
+        cnt++;
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNullPointerExceptionUponDataLoaderClosing() throws Exception {
+        try {
+            startGrids(5);
+
+            final CyclicBarrier barrier = new CyclicBarrier(2);
+
+            multithreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    U.awaitQuiet(barrier);
+
+                    G.stopAll(true);
+
+                    return null;
+                }
+            }, 1);
+
+            Ignite g4 = grid(4);
+
+            IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
+
+            dataLdr.perNodeBufferSize(32);
+
+            for (int i = 0; i < 100000; i += 2) {
+                dataLdr.addData(i, i);
+                dataLdr.removeData(i + 1);
+            }
+
+            U.awaitQuiet(barrier);
+
+            info("Closing data loader.");
+
+            try {
+                dataLdr.close(true);
+            }
+            catch (IllegalStateException ignore) {
+                // This is ok to ignore this exception as test is racy by it's nature -
+                // grid is stopping in different thread.
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Data streamer should correctly load entries from HashMap in case of grids with more than one node
+     *  and with GridOptimizedMarshaller that requires serializable.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAddDataFromMap() throws Exception {
+        try {
+            cnt = 0;
+
+            startGrids(2);
+
+            Ignite g0 = grid(0);
+
+            Marshaller marsh = g0.configuration().getMarshaller();
+
+            if (marsh instanceof OptimizedMarshaller)
+                assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable());
+            else
+                fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
+
+            IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
+
+            Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+
+            for (int i = 0; i < KEYS_COUNT; i ++)
+                map.put(i, String.valueOf(i));
+
+            dataLdr.addData(map);
+
+            dataLdr.close();
+
+            Random rnd = new Random();
+
+            IgniteCache<Integer, String> c = g0.jcache(null);
+
+            for (int i = 0; i < KEYS_COUNT; i ++) {
+                Integer k = rnd.nextInt(KEYS_COUNT);
+
+                String v = c.get(k);
+
+                assertEquals(k.toString(), v);
+            }
+        }
+        finally {
+            G.stopAll(true);
+        }
+    }
+
+    /**
+     * Gets cache configuration.
+     *
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(1);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return cacheCfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestObject implements Serializable {
+        /** */
+        private int val;
+
+        /**
+         */
+        private TestObject() {
+            // No-op.
+        }
+
+        /**
+         * @param val Value.
+         */
+        private TestObject(int val) {
+            this.val = val;
+        }
+
+        public Integer val() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            return obj instanceof TestObject && ((TestObject)obj).val == val;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
new file mode 100644
index 0000000..5f18df8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Data streamer performance test. Compares group lock data loader to traditional lock.
+ * <p>
+ * Disable assertions and give at least 2 GB heap to run this test.
+ */
+public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int GRID_CNT = 3;
+
+    /** */
+    private static final int ENTRY_CNT = 80000;
+
+    /** */
+    private boolean useCache;
+
+    /** */
+    private String[] vals = new String[2048];
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setIncludeProperties();
+
+        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+        cfg.setConnectorConfiguration(null);
+
+        cfg.setPeerClassLoadingEnabled(true);
+
+        if (useCache) {
+            CacheConfiguration cc = defaultCacheConfiguration();
+
+            cc.setCacheMode(PARTITIONED);
+
+            cc.setDistributionMode(PARTITIONED_ONLY);
+            cc.setWriteSynchronizationMode(FULL_SYNC);
+            cc.setStartSize(ENTRY_CNT / GRID_CNT);
+            cc.setSwapEnabled(false);
+
+            cc.setBackups(1);
+
+            cc.setStoreValueBytes(true);
+
+            cfg.setCacheSanityCheckEnabled(false);
+            cfg.setCacheConfiguration(cc);
+        }
+        else
+            cfg.setCacheConfiguration();
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        for (int i = 0; i < vals.length; i++) {
+            int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
+
+            StringBuilder sb = new StringBuilder();
+
+            for (int j = 0; j < valLen; j++)
+                sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
+
+            vals[i] = sb.toString();
+
+            info("Value: " + vals[i]);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPerformance() throws Exception {
+        doTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        System.gc();
+        System.gc();
+        System.gc();
+
+        try {
+            useCache = true;
+
+            startGridsMultiThreaded(GRID_CNT);
+
+            useCache = false;
+
+            Ignite ignite = startGrid();
+
+            final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
+
+            ldr.perNodeBufferSize(8192);
+            ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());
+            ldr.autoFlushFrequency(0);
+
+            final LongAdder cnt = new LongAdder();
+
+            long start = U.currentTimeMillis();
+
+            Thread t = new Thread(new Runnable() {
+                @SuppressWarnings("BusyWait")
+                @Override public void run() {
+                    while (true) {
+                        try {
+                            Thread.sleep(10000);
+                        }
+                        catch (InterruptedException ignored) {
+                            break;
+                        }
+
+                        info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
+                    }
+                }
+            });
+
+            t.setDaemon(true);
+
+            t.start();
+
+            int threadNum = 2;//Runtime.getRuntime().availableProcessors();
+
+            multithreaded(new Callable<Object>() {
+                @SuppressWarnings("InfiniteLoopStatement")
+                @Override public Object call() throws Exception {
+                    ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+
+                    while (true) {
+                        int i = rnd.nextInt(ENTRY_CNT);
+
+                        ldr.addData(i, vals[rnd.nextInt(vals.length)]);
+
+                        cnt.increment();
+                    }
+                }
+            }, threadNum, "loader");
+
+            info("Closing loader...");
+
+            ldr.close(false);
+
+            long duration = U.currentTimeMillis() - start;
+
+            info("Finished performance test. Duration: " + duration + "ms.");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/350ec49d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index a30eea5..30285a8 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -110,7 +110,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAffinityApiSelfTest.class);
         suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class);
         suite.addTestSuite(IgniteDataStreamerProcessorSelfTest.class);
-        suite.addTestSuite(GridDataLoaderImplSelfTest.class);
+        suite.addTestSuite(IgniteDataStreamerImplSelfTest.class);
         suite.addTestSuite(GridCacheEntryMemorySizeSelfTest.class);
         suite.addTestSuite(GridCacheClearAllSelfTest.class);
         suite.addTestSuite(GridCacheObjectToStringSelfTest.class);