You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/03 19:39:58 UTC
[3/3] incubator-ignite git commit: # ignite-394: IgniteDataLoader ->
IgniteDataStreamer.java + impl
# ignite-394: IgniteDataLoader -> IgniteDataStreamer.java + impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9b33b651
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9b33b651
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9b33b651
Branch: refs/heads/ignite-394
Commit: 9b33b6510f5b82c30c8e75a66eb328b00bc425e4
Parents: 6909cc4
Author: Artem Shutak <as...@gridgain.com>
Authored: Tue Mar 3 21:40:19 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Tue Mar 3 21:40:19 2015 +0300
----------------------------------------------------------------------
.../datagrid/CacheDataLoaderExample.java | 6 +-
.../datagrid/CachePopularNumbersExample.java | 4 +-
.../src/main/java/org/apache/ignite/Ignite.java | 6 +-
.../org/apache/ignite/IgniteDataLoader.java | 379 -----
.../org/apache/ignite/IgniteDataStreamer.java | 379 +++++
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 10 +-
.../GridDistributedCacheAdapter.java | 4 +-
.../dataload/GridDataLoadCacheUpdaters.java | 18 +-
.../dataload/GridDataLoadUpdateJob.java | 4 +-
.../dataload/GridDataLoaderFuture.java | 4 +-
.../dataload/GridDataLoaderProcessor.java | 16 +-
.../dataload/IgniteDataLoaderImpl.java | 1453 ------------------
.../dataload/IgniteDataStreamerImpl.java | 1453 ++++++++++++++++++
.../dr/GridDrDataLoadCacheUpdater.java | 2 +-
.../processors/igfs/IgfsDataManager.java | 10 +-
...iteTxConsistencyRestartAbstractSelfTest.java | 2 +-
...idCachePartitionedHitsAndMissesSelfTest.java | 4 +-
.../GridCacheLruNearEvictionPolicySelfTest.java | 2 +-
...heNearOnlyLruNearEvictionPolicySelfTest.java | 2 +-
.../dataload/GridDataLoaderImplSelfTest.java | 5 +-
.../dataload/GridDataLoaderPerformanceTest.java | 2 +-
.../GridDataLoaderProcessorSelfTest.java | 27 +-
.../loadtests/colocation/GridTestMain.java | 2 +-
.../loadtests/discovery/GridGcTimeoutTest.java | 2 +-
.../mapper/GridContinuousMapperLoadTest1.java | 2 +-
.../mapper/GridContinuousMapperLoadTest2.java | 2 +-
.../ignite/testframework/junits/IgniteMock.java | 4 +-
.../scala/org/apache/ignite/scalar/scalar.scala | 8 +-
.../org/apache/ignite/IgniteSpringBean.java | 2 +-
.../cache/IgniteSqlQueryBenchmark.java | 2 +-
.../cache/IgniteSqlQueryJoinBenchmark.java | 2 +-
32 files changed, 1909 insertions(+), 1911 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
index 57b0cd2..4cdbfd4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheDataLoaderExample.java
@@ -21,8 +21,8 @@ import org.apache.ignite.*;
import org.apache.ignite.examples.*;
/**
- * Demonstrates how cache can be populated with data utilizing {@link IgniteDataLoader} API.
- * {@link IgniteDataLoader} is a lot more efficient to use than standard
+ * Demonstrates how cache can be populated with data utilizing {@link IgniteDataStreamer} API.
+ * {@link IgniteDataStreamer} is a lot more efficient to use than standard
* {@code put(...)} operation as it properly buffers cache requests
* together and properly manages load on remote nodes.
* <p>
@@ -63,7 +63,7 @@ public class CacheDataLoaderExample {
long start = System.currentTimeMillis();
- try (IgniteDataLoader<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
+ try (IgniteDataStreamer<Integer, String> ldr = ignite.dataLoader(CACHE_NAME)) {
// Configure loader.
ldr.perNodeBufferSize(1024);
ldr.perNodeParallelLoadOperations(8);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
index 0f71681..1fc737b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePopularNumbersExample.java
@@ -92,7 +92,7 @@ public class CachePopularNumbersExample {
* @throws IgniteException If failed.
*/
private static void streamData(final Ignite ignite) throws IgniteException {
- try (IgniteDataLoader<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
+ try (IgniteDataStreamer<Integer, Long> ldr = ignite.dataLoader(CACHE_NAME)) {
// Set larger per-node buffer size since our state is relatively small.
ldr.perNodeBufferSize(2048);
@@ -140,7 +140,7 @@ public class CachePopularNumbersExample {
/**
* Increments value for key.
*/
- private static class IncrementingUpdater implements IgniteDataLoader.Updater<Integer, Long> {
+ private static class IncrementingUpdater implements IgniteDataStreamer.Updater<Integer, Long> {
/** */
private static final EntryProcessor<Integer, Long, Void> INC = new EntryProcessor<Integer, Long, Void>() {
@Override public Void process(MutableEntry<Integer, Long> e, Object... args) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 8851d8f..44d4ba9 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -42,7 +42,7 @@ import java.util.concurrent.*;
* In addition to {@link ClusterGroup} functionality, from here you can get the following:
* <ul>
* <li>{@link org.apache.ignite.cache.GridCache} - functionality for in-memory distributed cache.</li>
- * <li>{@link IgniteDataLoader} - functionality for loading data large amounts of data into cache.</li>
+ * <li>{@link IgniteDataStreamer} - functionality for loading data large amounts of data into cache.</li>
* <li>{@link IgniteFs} - functionality for distributed Hadoop-compliant in-memory file system and map-reduce.</li>
* <li>{@link IgniteStreamer} - functionality for streaming events workflow with queries and indexes into rolling windows.</li>
* <li>{@link IgniteScheduler} - functionality for scheduling jobs using UNIX Cron syntax.</li>
@@ -205,12 +205,12 @@ public interface Ignite extends AutoCloseable {
/**
* Gets a new instance of data loader associated with given cache name. Data loader
* is responsible for loading external data into in-memory data grid. For more information
- * refer to {@link IgniteDataLoader} documentation.
+ * refer to {@link IgniteDataStreamer} documentation.
*
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
*/
- public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName);
+ public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName);
/**
* Gets an instance of IGFS - Ignite In-Memory File System, if one is not
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
deleted file mode 100644
index 3cff287..0000000
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataLoader.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite;
-
-import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Data loader is responsible for loading external data into cache. It achieves it by
- * properly buffering updates and properly mapping keys to nodes responsible for the data
- * to make sure that there is the least amount of data movement possible and optimal
- * network and memory utilization.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- * <p>
- * Also note that {@code GridDataLoader} is not the only way to load data into cache.
- * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
- * method to load data from underlying data store. You can also use standard
- * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
- * likely will not perform as well as this class for loading data. And finally,
- * data can be loaded from underlying data store on demand, whenever it is accessed -
- * for this no explicit data loading step is needed.
- * <p>
- * {@code IgniteDataLoader} supports the following configuration properties:
- * <ul>
- * <li>
- * {@link #perNodeBufferSize(int)} - when entries are added to data loader via
- * {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
- * away and are buffered internally for better performance and network utilization.
- * This setting controls the size of internal per-node buffer before buffered data
- * is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
- * value.
- * </li>
- * <li>
- * {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
- * to the data loader via {@link #addData(Object, Object)} method faster than it can
- * be put in cache. In this case, new buffered load messages are sent to remote nodes
- * before responses from previous ones are received. This could cause unlimited heap
- * memory utilization growth on local and remote nodes. To control memory utilization,
- * this setting limits maximum allowed number of parallel buffered load messages that
- * are being processed on remote nodes. If this number is exceeded, then
- * {@link #addData(Object, Object)} method will block to control memory utilization.
- * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
- * </li>
- * <li>
- * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
- * this is the time after which the loader will make an attempt to submit all data
- * added so far to remote nodes. Note that there is no guarantee that data will be
- * delivered after this concrete attempt (e.g., it can fail when topology is
- * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
- * </li>
- * <li>
- * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
- * updates and allow data loader choose most optimal concurrent implementation.
- * </li>
- * <li>
- * {@link #updater(IgniteDataLoader.Updater)} - defines how cache will be updated with loaded entries.
- * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
- * </li>
- * <li>
- * {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
- * loaded by a data loader must be class-loadable from the same class-loader.
- * Ignite will make the best effort to detect the most suitable class-loader
- * for data loading. However, in complex cases, where compound or deeply nested
- * class-loaders are used, it is best to specify a deploy class which can be any
- * class loaded by the class-loader for given data.
- * </li>
- * </ul>
- */
-public interface IgniteDataLoader<K, V> extends AutoCloseable {
- /** Default max concurrent put operations count. */
- public static final int DFLT_MAX_PARALLEL_OPS = 16;
-
- /** Default per node buffer size. */
- public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
-
- /**
- * Name of cache to load data to.
- *
- * @return Cache name or {@code null} for default cache.
- */
- public String cacheName();
-
- /**
- * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
- * Default is {@code true}.
- *
- * @return Flag value.
- */
- public boolean allowOverwrite();
-
- /**
- * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
- * Should not be used when custom cache updater set using {@link #updater(IgniteDataLoader.Updater)} method.
- * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
- *
- * @param allowOverwrite Flag value.
- * @throws IgniteException If failed.
- */
- public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
-
- /**
- * Gets flag indicating that write-through behavior should be disabled for data loading.
- * Default is {@code false}.
- *
- * @return Skip store flag.
- */
- public boolean skipStore();
-
- /**
- * Sets flag indicating that write-through behavior should be disabled for data loading.
- * Default is {@code false}.
- *
- * @param skipStore Skip store flag.
- */
- public void skipStore(boolean skipStore);
-
- /**
- * Gets size of per node key-value pairs buffer.
- *
- * @return Per node buffer size.
- */
- public int perNodeBufferSize();
-
- /**
- * Sets size of per node key-value pairs buffer.
- * <p>
- * This method should be called prior to {@link #addData(Object, Object)} call.
- * <p>
- * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
- *
- * @param bufSize Per node buffer size.
- */
- public void perNodeBufferSize(int bufSize);
-
- /**
- * Gets maximum number of parallel load operations for a single node.
- *
- * @return Maximum number of parallel load operations for a single node.
- */
- public int perNodeParallelLoadOperations();
-
- /**
- * Sets maximum number of parallel load operations for a single node.
- * <p>
- * This method should be called prior to {@link #addData(Object, Object)} call.
- * <p>
- * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
- *
- * @param parallelOps Maximum number of parallel load operations for a single node.
- */
- public void perNodeParallelLoadOperations(int parallelOps);
-
- /**
- * Gets automatic flush frequency. Essentially, this is the time after which the
- * loader will make an attempt to submit all data added so far to remote nodes.
- * Note that there is no guarantee that data will be delivered after this concrete
- * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
- * <p>
- * If set to {@code 0}, automatic flush is disabled.
- * <p>
- * Automatic flush is disabled by default (default value is {@code 0}).
- *
- * @return Flush frequency or {@code 0} if automatic flush is disabled.
- * @see #flush()
- */
- public long autoFlushFrequency();
-
- /**
- * Sets automatic flush frequency. Essentially, this is the time after which the
- * loader will make an attempt to submit all data added so far to remote nodes.
- * Note that there is no guarantee that data will be delivered after this concrete
- * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
- * <p>
- * If set to {@code 0}, automatic flush is disabled.
- * <p>
- * Automatic flush is disabled by default (default value is {@code 0}).
- *
- * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
- * @see #flush()
- */
- public void autoFlushFrequency(long autoFlushFreq);
-
- /**
- * Gets future for this loading process. This future completes whenever method
- * {@link #close(boolean)} completes. By attaching listeners to this future
- * it is possible to get asynchronous notifications for completion of this
- * loading process.
- *
- * @return Future for this loading process.
- */
- public IgniteFuture<?> future();
-
- /**
- * Optional deploy class for peer deployment. All classes loaded by a data loader
- * must be class-loadable from the same class-loader. Ignite will make the best
- * effort to detect the most suitable class-loader for data loading. However,
- * in complex cases, where compound or deeply nested class-loaders are used,
- * it is best to specify a deploy class which can be any class loaded by
- * the class-loader for given data.
- *
- * @param depCls Any class loaded by the class-loader for given data.
- */
- public void deployClass(Class<?> depCls);
-
- /**
- * Sets custom cache updater to this data loader.
- *
- * @param updater Cache updater.
- */
- public void updater(Updater<K, V> updater);
-
- /**
- * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
- *
- * @param key Key.
- * @return Future fo this operation.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- */
- public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param key Key.
- * @param val Value or {@code null} if respective entry must be removed from cache.
- * @return Future fo this operation.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- */
- public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
- IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param entry Entry.
- * @return Future fo this operation.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- */
- public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
- IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param entries Collection of entries to be loaded.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @return Future for this load operation.
- */
- public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
-
- /**
- * Adds data for loading on remote node. This method can be called from multiple
- * threads in parallel to speed up loading if needed.
- * <p>
- * Note that loader will load data concurrently by multiple internal threads, so the
- * data may get to remote nodes in different order from which it was added to
- * the loader.
- *
- * @param entries Map to be loaded.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @return Future for this load operation.
- */
- public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
-
- /**
- * Loads any remaining data, but doesn't close the loader. Data can be still added after
- * flush is finished. This method blocks and doesn't allow to add any data until all data
- * is loaded.
- * <p>
- * If another thread is already performing flush, this method will block, wait for
- * another thread to complete flush and exit. If you don't want to wait in this case,
- * use {@link #tryFlush()} method.
- *
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @see #tryFlush()
- */
- public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
- /**
- * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
- * with the difference that it won't wait and will exit immediately.
- *
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- * @throws IllegalStateException If grid has been concurrently stopped or
- * {@link #close(boolean)} has already been called on loader.
- * @see #flush()
- */
- public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
-
- /**
- * Loads any remaining data and closes this loader.
- *
- * @param cancel {@code True} to cancel ongoing loading operations.
- * @throws IgniteException If failed to map key to node.
- * @throws IgniteInterruptedException If thread has been interrupted.
- */
- public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
-
- /**
- * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
- * <p>
- * The method is invoked automatically on objects managed by the
- * {@code try-with-resources} statement.
- *
- * @throws IgniteException If failed to close data loader.
- * @throws IgniteInterruptedException If thread has been interrupted.
- */
- @Override public void close() throws IgniteException, IgniteInterruptedException;
-
- /**
- * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataLoader#allowOverwrite(boolean)}
- * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
- * performance custom user-defined implementation may help.
- * <p>
- * Data loader can be configured to use custom implementation of updater instead of default one using
- * {@link IgniteDataLoader#updater(IgniteDataLoader.Updater)} method.
- */
- interface Updater<K, V> extends Serializable {
- /**
- * Updates cache with batch of entries.
- *
- * @param cache Cache.
- * @param entries Collection of entries.
- * @throws IgniteException If failed.
- */
- public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
new file mode 100644
index 0000000..c48d61a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Data loader is responsible for loading external data into cache. It achieves it by
+ * properly buffering updates and properly mapping keys to nodes responsible for the data
+ * to make sure that there is the least amount of data movement possible and optimal
+ * network and memory utilization.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ * <p>
+ * Also note that {@code GridDataLoader} is not the only way to load data into cache.
+ * Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
+ * method to load data from underlying data store. You can also use standard
+ * cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
+ * likely will not perform as well as this class for loading data. And finally,
+ * data can be loaded from underlying data store on demand, whenever it is accessed -
+ * for this no explicit data loading step is needed.
+ * <p>
+ * {@code IgniteDataLoader} supports the following configuration properties:
+ * <ul>
+ * <li>
+ * {@link #perNodeBufferSize(int)} - when entries are added to data loader via
+ * {@link #addData(Object, Object)} method, they are not sent to in-memory data grid right
+ * away and are buffered internally for better performance and network utilization.
+ * This setting controls the size of internal per-node buffer before buffered data
+ * is sent to remote node. Default is defined by {@link #DFLT_PER_NODE_BUFFER_SIZE}
+ * value.
+ * </li>
+ * <li>
+ * {@link #perNodeParallelLoadOperations(int)} - sometimes data may be added
+ * to the data loader via {@link #addData(Object, Object)} method faster than it can
+ * be put in cache. In this case, new buffered load messages are sent to remote nodes
+ * before responses from previous ones are received. This could cause unlimited heap
+ * memory utilization growth on local and remote nodes. To control memory utilization,
+ * this setting limits maximum allowed number of parallel buffered load messages that
+ * are being processed on remote nodes. If this number is exceeded, then
+ * {@link #addData(Object, Object)} method will block to control memory utilization.
+ * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value.
+ * </li>
+ * <li>
+ * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially,
+ * this is the time after which the loader will make an attempt to submit all data
+ * added so far to remote nodes. Note that there is no guarantee that data will be
+ * delivered after this concrete attempt (e.g., it can fail when topology is
+ * changing), but it won't be lost anyway. Disabled by default (default value is {@code 0}).
+ * </li>
+ * <li>
+ * {@link #allowOverwrite(boolean)} - defines if data loader will assume that there are no other concurrent
+ * updates and allow data loader choose most optimal concurrent implementation.
+ * </li>
+ * <li>
+ * {@link #updater(IgniteDataStreamer.Updater)} - defines how cache will be updated with loaded entries.
+ * It allows to provide user-defined custom logic to update the cache in the most effective and flexible way.
+ * </li>
+ * <li>
+ * {@link #deployClass(Class)} - optional deploy class for peer deployment. All classes
+ * loaded by a data loader must be class-loadable from the same class-loader.
+ * Ignite will make the best effort to detect the most suitable class-loader
+ * for data loading. However, in complex cases, where compound or deeply nested
+ * class-loaders are used, it is best to specify a deploy class which can be any
+ * class loaded by the class-loader for given data.
+ * </li>
+ * </ul>
+ */
+public interface IgniteDataStreamer<K, V> extends AutoCloseable {
+ /** Default max concurrent put operations count. */
+ public static final int DFLT_MAX_PARALLEL_OPS = 16;
+
+ /** Default per node buffer size. */
+ public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024;
+
+ /**
+ * Name of cache to load data to.
+ *
+ * @return Cache name or {@code null} for default cache.
+ */
+ public String cacheName();
+
+ /**
+ * Gets flag value indicating that this data loader assumes that there are no other concurrent updates to the cache.
+ * Default is {@code true}.
+ *
+ * @return Flag value.
+ */
+ public boolean allowOverwrite();
+
+ /**
+ * Sets flag indicating that this data loader should assume that there are no other concurrent updates to the cache.
+ * Should not be used when custom cache updater set using {@link #updater(IgniteDataStreamer.Updater)} method.
+ * Default is {@code true}. When this flag is set, updates will not be propagated to the cache store.
+ *
+ * @param allowOverwrite Flag value.
+ * @throws IgniteException If failed.
+ */
+ public void allowOverwrite(boolean allowOverwrite) throws IgniteException;
+
+ /**
+ * Gets flag indicating that write-through behavior should be disabled for data loading.
+ * Default is {@code false}.
+ *
+ * @return Skip store flag.
+ */
+ public boolean skipStore();
+
+ /**
+ * Sets flag indicating that write-through behavior should be disabled for data loading.
+ * Default is {@code false}.
+ *
+ * @param skipStore Skip store flag.
+ */
+ public void skipStore(boolean skipStore);
+
+ /**
+ * Gets size of per node key-value pairs buffer.
+ *
+ * @return Per node buffer size.
+ */
+ public int perNodeBufferSize();
+
+ /**
+ * Sets size of per node key-value pairs buffer.
+ * <p>
+ * This method should be called prior to {@link #addData(Object, Object)} call.
+ * <p>
+ * If not provided, default value is {@link #DFLT_PER_NODE_BUFFER_SIZE}.
+ *
+ * @param bufSize Per node buffer size.
+ */
+ public void perNodeBufferSize(int bufSize);
+
+ /**
+ * Gets maximum number of parallel load operations for a single node.
+ *
+ * @return Maximum number of parallel load operations for a single node.
+ */
+ public int perNodeParallelLoadOperations();
+
+ /**
+ * Sets maximum number of parallel load operations for a single node.
+ * <p>
+ * This method should be called prior to {@link #addData(Object, Object)} call.
+ * <p>
+ * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}.
+ *
+ * @param parallelOps Maximum number of parallel load operations for a single node.
+ */
+ public void perNodeParallelLoadOperations(int parallelOps);
+
+ /**
+ * Gets automatic flush frequency. Essentially, this is the time after which the
+ * loader will make an attempt to submit all data added so far to remote nodes.
+ * Note that there is no guarantee that data will be delivered after this concrete
+ * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+ * <p>
+ * If set to {@code 0}, automatic flush is disabled.
+ * <p>
+ * Automatic flush is disabled by default (default value is {@code 0}).
+ *
+ * @return Flush frequency or {@code 0} if automatic flush is disabled.
+ * @see #flush()
+ */
+ public long autoFlushFrequency();
+
+ /**
+ * Sets automatic flush frequency. Essentially, this is the time after which the
+ * loader will make an attempt to submit all data added so far to remote nodes.
+ * Note that there is no guarantee that data will be delivered after this concrete
+ * attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.
+ * <p>
+ * If set to {@code 0}, automatic flush is disabled.
+ * <p>
+ * Automatic flush is disabled by default (default value is {@code 0}).
+ *
+ * @param autoFlushFreq Flush frequency or {@code 0} to disable automatic flush.
+ * @see #flush()
+ */
+ public void autoFlushFrequency(long autoFlushFreq);
+
+ /**
+ * Gets future for this loading process. This future completes whenever method
+ * {@link #close(boolean)} completes. By attaching listeners to this future
+ * it is possible to get asynchronous notifications for completion of this
+ * loading process.
+ *
+ * @return Future for this loading process.
+ */
+ public IgniteFuture<?> future();
+
+ /**
+ * Optional deploy class for peer deployment. All classes loaded by a data loader
+ * must be class-loadable from the same class-loader. Ignite will make the best
+ * effort to detect the most suitable class-loader for data loading. However,
+ * in complex cases, where compound or deeply nested class-loaders are used,
+ * it is best to specify a deploy class which can be any class loaded by
+ * the class-loader for given data.
+ *
+ * @param depCls Any class loaded by the class-loader for given data.
+ */
+ public void deployClass(Class<?> depCls);
+
+ /**
+ * Sets custom cache updater to this data loader.
+ *
+ * @param updater Cache updater.
+ */
+ public void updater(Updater<K, V> updater);
+
+ /**
+ * Adds key for removal on remote node. Equivalent to {@link #addData(Object, Object) addData(key, null)}.
+ *
+ * @param key Key.
+ * @return Future fo this operation.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ */
+ public IgniteFuture<?> removeData(K key) throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param key Key.
+ * @param val Value or {@code null} if respective entry must be removed from cache.
+ * @return Future fo this operation.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ */
+ public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IgniteInterruptedException,
+ IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param entry Entry.
+ * @return Future fo this operation.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ */
+ public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IgniteInterruptedException,
+ IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param entries Collection of entries to be loaded.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @return Future for this load operation.
+ */
+ public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) throws IllegalStateException;
+
+ /**
+ * Adds data for loading on remote node. This method can be called from multiple
+ * threads in parallel to speed up loading if needed.
+ * <p>
+ * Note that loader will load data concurrently by multiple internal threads, so the
+ * data may get to remote nodes in different order from which it was added to
+ * the loader.
+ *
+ * @param entries Map to be loaded.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @return Future for this load operation.
+ */
+ public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException;
+
+ /**
+ * Loads any remaining data, but doesn't close the loader. Data can be still added after
+ * flush is finished. This method blocks and doesn't allow to add any data until all data
+ * is loaded.
+ * <p>
+ * If another thread is already performing flush, this method will block, wait for
+ * another thread to complete flush and exit. If you don't want to wait in this case,
+ * use {@link #tryFlush()} method.
+ *
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @see #tryFlush()
+ */
+ public void flush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+ /**
+ * Makes an attempt to load remaining data. This method is mostly similar to {@link #flush},
+ * with the difference that it won't wait and will exit immediately.
+ *
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ * @throws IllegalStateException If grid has been concurrently stopped or
+ * {@link #close(boolean)} has already been called on loader.
+ * @see #flush()
+ */
+ public void tryFlush() throws IgniteException, IgniteInterruptedException, IllegalStateException;
+
+ /**
+ * Loads any remaining data and closes this loader.
+ *
+ * @param cancel {@code True} to cancel ongoing loading operations.
+ * @throws IgniteException If failed to map key to node.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ */
+ public void close(boolean cancel) throws IgniteException, IgniteInterruptedException;
+
+ /**
+ * Closes data loader. This method is identical to calling {@link #close(boolean) close(false)} method.
+ * <p>
+ * The method is invoked automatically on objects managed by the
+ * {@code try-with-resources} statement.
+ *
+ * @throws IgniteException If failed to close data loader.
+ * @throws IgniteInterruptedException If thread has been interrupted.
+ */
+ @Override public void close() throws IgniteException, IgniteInterruptedException;
+
+ /**
+ * Updates cache with batch of entries. Usually it is enough to configure {@link IgniteDataStreamer#allowOverwrite(boolean)}
+ * property and appropriate internal cache updater will be chosen automatically. But in some cases to achieve best
+ * performance custom user-defined implementation may help.
+ * <p>
+ * Data loader can be configured to use custom implementation of updater instead of default one using
+ * {@link IgniteDataStreamer#updater(IgniteDataStreamer.Updater)} method.
+ */
+ interface Updater<K, V> extends Serializable {
+ /**
+ * Updates cache with batch of entries.
+ *
+ * @param cache Cache.
+ * @param entries Collection of entries.
+ * @throws IgniteException If failed.
+ */
+ public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index f46d071..336f872 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2346,7 +2346,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** {@inheritDoc} */
- @Override public <K, V> IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+ @Override public <K, V> IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
guard();
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 12ea535..6ed5699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3877,7 +3877,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry();
if (ctx.store().isLocalStore()) {
- IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+ IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -4043,7 +4043,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
* @throws IgniteCheckedException If failed.
*/
private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException {
- try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
+ try (final IgniteDataStreamer<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false)) {
ldr.allowOverwrite(true);
ldr.skipStore(true);
@@ -4086,7 +4086,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry();
if (ctx.store().isLocalStore()) {
- IgniteDataLoaderImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
+ IgniteDataStreamerImpl<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(), false);
try {
ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
@@ -6134,7 +6134,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
final Collection<Map.Entry<K, V>> col;
/** */
- final IgniteDataLoaderImpl<K, V> ldr;
+ final IgniteDataStreamerImpl<K, V> ldr;
/** */
final ExpiryPolicy plc;
@@ -6145,7 +6145,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
* @param plc Optional expiry policy.
*/
private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p,
- IgniteDataLoaderImpl<K, V> ldr,
+ IgniteDataStreamerImpl<K, V> ldr,
@Nullable ExpiryPolicy plc) {
this.p = p;
this.ldr = ldr;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 00190d9..c99efc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -276,8 +276,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
else
dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
- try (IgniteDataLoader<K, V> dataLdr = ignite.dataLoader(cacheName)) {
- ((IgniteDataLoaderImpl)dataLdr).maxRemapCount(0);
+ try (IgniteDataStreamer<K, V> dataLdr = ignite.dataLoader(cacheName)) {
+ ((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
index e2e780b..78a7e62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
@@ -29,13 +29,13 @@ import java.util.*;
*/
public class GridDataLoadCacheUpdaters {
/** */
- private static final IgniteDataLoader.Updater INDIVIDUAL = new Individual();
+ private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
/** */
- private static final IgniteDataLoader.Updater BATCHED = new Batched();
+ private static final IgniteDataStreamer.Updater BATCHED = new Batched();
/** */
- private static final IgniteDataLoader.Updater BATCHED_SORTED = new BatchedSorted();
+ private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
/**
* Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
@@ -44,7 +44,7 @@ public class GridDataLoadCacheUpdaters {
*
* @return Single updater.
*/
- public static <K, V> IgniteDataLoader.Updater<K, V> individual() {
+ public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
return INDIVIDUAL;
}
@@ -55,7 +55,7 @@ public class GridDataLoadCacheUpdaters {
*
* @return Batched updater.
*/
- public static <K, V> IgniteDataLoader.Updater<K, V> batched() {
+ public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
return BATCHED;
}
@@ -66,7 +66,7 @@ public class GridDataLoadCacheUpdaters {
*
* @return Batched sorted updater.
*/
- public static <K extends Comparable<?>, V> IgniteDataLoader.Updater<K, V> batchedSorted() {
+ public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
return BATCHED_SORTED;
}
@@ -93,7 +93,7 @@ public class GridDataLoadCacheUpdaters {
/**
* Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
*/
- private static class Individual<K, V> implements IgniteDataLoader.Updater<K, V> {
+ private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;
@@ -120,7 +120,7 @@ public class GridDataLoadCacheUpdaters {
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
- private static class Batched<K, V> implements IgniteDataLoader.Updater<K, V> {
+ private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;
@@ -160,7 +160,7 @@ public class GridDataLoadCacheUpdaters {
/**
* Batched updater. Updates cache using batch operations thus is dead lock prone.
*/
- private static class BatchedSorted<K, V> implements IgniteDataLoader.Updater<K, V> {
+ private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
index 9e2a483..8aa554a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
@@ -48,7 +48,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
private final boolean skipStore;
/** */
- private final IgniteDataLoader.Updater<K, V> updater;
+ private final IgniteDataStreamer.Updater<K, V> updater;
/**
* @param ctx Context.
@@ -65,7 +65,7 @@ class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
Collection<Map.Entry<K, V>> col,
boolean ignoreDepOwnership,
boolean skipStore,
- IgniteDataLoader.Updater<K, V> updater) {
+ IgniteDataStreamer.Updater<K, V> updater) {
this.ctx = ctx;
this.log = log;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
index 5efcfe9..dffa862 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderFuture.java
@@ -34,7 +34,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
/** Data loader. */
@GridToStringExclude
- private IgniteDataLoaderImpl dataLdr;
+ private IgniteDataStreamerImpl dataLdr;
/**
* Default constructor for {@link Externalizable} support.
@@ -47,7 +47,7 @@ class GridDataLoaderFuture extends GridFutureAdapter<Object> {
* @param ctx Context.
* @param dataLdr Data loader.
*/
- GridDataLoaderFuture(GridKernalContext ctx, IgniteDataLoaderImpl dataLdr) {
+ GridDataLoaderFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
super(ctx);
assert dataLdr != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9b33b651/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
index d470d02..b29c9ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessor.java
@@ -41,7 +41,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
*/
public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
/** Loaders map (access is not supposed to be highly concurrent). */
- private Collection<IgniteDataLoaderImpl> ldrs = new GridConcurrentHashSet<>();
+ private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
@@ -50,7 +50,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
private Thread flusher;
/** */
- private final DelayQueue<IgniteDataLoaderImpl<K, V>> flushQ = new DelayQueue<>();
+ private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
/** Marshaller. */
private final Marshaller marsh;
@@ -80,7 +80,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
@Override protected void body() throws InterruptedException {
while (!isCancelled()) {
- IgniteDataLoaderImpl<K, V> ldr = flushQ.take();
+ IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
if (!busyLock.enterBusy())
return;
@@ -118,7 +118,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
U.interrupt(flusher);
U.join(flusher, log);
- for (IgniteDataLoaderImpl<?, ?> ldr : ldrs) {
+ for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
if (log.isDebugEnabled())
log.debug("Closing active data loader on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
@@ -142,12 +142,12 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
* @param compact {@code true} if data loader should transfer data in compact format.
* @return Data loader.
*/
- public IgniteDataLoaderImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
+ public IgniteDataStreamerImpl<K, V> dataLoader(@Nullable String cacheName, boolean compact) {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to create data loader (grid is stopping).");
try {
- final IgniteDataLoaderImpl<K, V> ldr = new IgniteDataLoaderImpl<>(ctx, cacheName, flushQ, compact);
+ final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
ldrs.add(ldr);
@@ -173,7 +173,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
*/
- public IgniteDataLoader<K, V> dataLoader(@Nullable String cacheName) {
+ public IgniteDataStreamer<K, V> dataLoader(@Nullable String cacheName) {
return dataLoader(cacheName, true);
}
@@ -234,7 +234,7 @@ public class GridDataLoaderProcessor<K, V> extends GridProcessorAdapter {
}
Collection<Map.Entry<K, V>> col;
- IgniteDataLoader.Updater<K, V> updater;
+ IgniteDataStreamer.Updater<K, V> updater;
try {
col = marsh.unmarshal(req.collectionBytes(), clsLdr);