You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/04 14:44:05 UTC
[1/5] incubator-ignite git commit: # gg-9869
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-394 e5f686239 -> 9c8217c17
# gg-9869
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/96f426bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/96f426bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/96f426bb
Branch: refs/heads/ignite-394
Commit: 96f426bb0b5b3d19badac0610726f58ca4a0c15e
Parents: e5f6862
Author: Artem Shutak <as...@gridgain.com>
Authored: Wed Mar 4 15:24:33 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Wed Mar 4 15:24:33 2015 +0300
----------------------------------------------------------------------
.../ignite/examples/CacheExamplesSelfTest.java | 2 +-
.../org/apache/ignite/IgniteDataStreamer.java | 2 +-
.../GridDistributedCacheAdapter.java | 2 +-
.../dataload/GridDataLoadCacheUpdaters.java | 199 -------------------
.../dataload/GridDataLoadUpdateJob.java | 119 -----------
.../IgniteDataStreamerCacheUpdaters.java | 199 +++++++++++++++++++
.../dataload/IgniteDataStreamerImpl.java | 10 +-
.../dataload/IgniteDataStreamerProcessor.java | 2 +-
.../dataload/IgniteDataStreamerUpdateJob.java | 119 +++++++++++
.../processors/igfs/IgfsDataManager.java | 2 +-
.../IgniteDataStreamerImplSelfTest.java | 4 +-
.../IgniteDataStreamerPerformanceTest.java | 2 +-
.../IgniteDataStreamerProcessorSelfTest.java | 22 +-
13 files changed, 342 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
----------------------------------------------------------------------
diff --git a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
index c5c4599..14af44f 100644
--- a/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
+++ b/examples/src/test/java/org/apache/ignite/examples/CacheExamplesSelfTest.java
@@ -114,7 +114,7 @@ public class CacheExamplesSelfTest extends GridAbstractExamplesTest {
/**
* @throws Exception If failed.
*/
- public void testCacheDataLoaderExample() throws Exception {
+ public void testCacheDataStreamerExample() throws Exception {
CacheDataStreamerExample.main(EMPTY_ARGS);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
index 22aa0c1..a47c079 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java
@@ -33,7 +33,7 @@ import java.util.*;
* data may get to remote nodes in different order from which it was added to
* the loader.
* <p>
- * Also note that {@code GridDataLoader} is not the only way to load data into cache.
+ * Also note that {@code IgniteDataStreamer} is not the only way to load data into cache.
* Alternatively you can use {@link IgniteCache#loadCache(IgniteBiPredicate, Object...)}
* method to load data from underlying data store. You can also use standard
* cache {@code put(...)} and {@code putAll(...)} operations as well, but they most
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 5791b8d..16419f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -279,7 +279,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
try (IgniteDataStreamer<K, V> dataLdr = ignite.dataStreamer(cacheName)) {
((IgniteDataStreamerImpl)dataLdr).maxRemapCount(0);
- dataLdr.updater(GridDataLoadCacheUpdaters.<K, V>batched());
+ dataLdr.updater(IgniteDataStreamerCacheUpdaters.<K, V>batched());
for (GridDhtLocalPartition<K, V> locPart : dht.topology().currentLocalPartitions()) {
if (!locPart.isEmpty() && locPart.primary(topVer)) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
deleted file mode 100644
index 78a7e62..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadCacheUpdaters.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Bundled factory for cache updaters.
- */
-public class GridDataLoadCacheUpdaters {
- /** */
- private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
-
- /** */
- private static final IgniteDataStreamer.Updater BATCHED = new Batched();
-
- /** */
- private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
-
- /**
- * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
- * is not the best.
- *
- * @return Single updater.
- */
- public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
- return INDIVIDUAL;
- }
-
- /**
- * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
- * updated concurrently. Performance is generally better than in {@link #individual()}.
- *
- * @return Batched updater.
- */
- public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
- return BATCHED;
- }
-
- /**
- * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
- * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
- *
- * @return Batched sorted updater.
- */
- public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
- return BATCHED_SORTED;
- }
-
- /**
- * Updates cache.
- *
- * @param cache Cache.
- * @param rmvCol Keys to remove.
- * @param putMap Entries to put.
- * @throws IgniteException If failed.
- */
- protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
- Map<K, V> putMap) {
- assert rmvCol != null || putMap != null;
-
- // Here we assume that there are no key duplicates, so the following calls are valid.
- if (rmvCol != null)
- ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
-
- if (putMap != null)
- cache.putAll(putMap);
- }
-
- /**
- * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
- */
- private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key != null;
-
- V val = entry.getValue();
-
- if (val == null)
- cache.remove(key);
- else
- cache.put(key, val);
- }
- }
- }
-
- /**
- * Batched updater. Updates cache using batch operations thus is dead lock prone.
- */
- private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- Map<K, V> putAll = null;
- Set<K> rmvAll = null;
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key != null;
-
- V val = entry.getValue();
-
- if (val == null) {
- if (rmvAll == null)
- rmvAll = new HashSet<>();
-
- rmvAll.add(key);
- }
- else {
- if (putAll == null)
- putAll = new HashMap<>();
-
- putAll.put(key, val);
- }
- }
-
- updateAll(cache, rmvAll, putAll);
- }
- }
-
- /**
- * Batched updater. Updates cache using batch operations thus is dead lock prone.
- */
- private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- Map<K, V> putAll = null;
- Set<K> rmvAll = null;
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key instanceof Comparable;
-
- V val = entry.getValue();
-
- if (val == null) {
- if (rmvAll == null)
- rmvAll = new TreeSet<>();
-
- rmvAll.add(key);
- }
- else {
- if (putAll == null)
- putAll = new TreeMap<>();
-
- putAll.put(key, val);
- }
- }
-
- updateAll(cache, rmvAll, putAll);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
deleted file mode 100644
index 8aa554a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadUpdateJob.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class GridDataLoadUpdateJob<K, V> implements GridPlainCallable<Object> {
- /** */
- private final GridKernalContext ctx;
-
- /** */
- private final IgniteLogger log;
-
- /** Cache name. */
- private final String cacheName;
-
- /** Entries to put. */
- private final Collection<Map.Entry<K, V>> col;
-
- /** {@code True} to ignore deployment ownership. */
- private final boolean ignoreDepOwnership;
-
- /** */
- private final boolean skipStore;
-
- /** */
- private final IgniteDataStreamer.Updater<K, V> updater;
-
- /**
- * @param ctx Context.
- * @param log Log.
- * @param cacheName Cache name.
- * @param col Entries to put.
- * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
- * @param updater Updater.
- */
- GridDataLoadUpdateJob(
- GridKernalContext ctx,
- IgniteLogger log,
- @Nullable String cacheName,
- Collection<Map.Entry<K, V>> col,
- boolean ignoreDepOwnership,
- boolean skipStore,
- IgniteDataStreamer.Updater<K, V> updater) {
- this.ctx = ctx;
- this.log = log;
-
- assert col != null && !col.isEmpty();
- assert updater != null;
-
- this.cacheName = cacheName;
- this.col = col;
- this.ignoreDepOwnership = ignoreDepOwnership;
- this.skipStore = skipStore;
- this.updater = updater;
- }
-
- /** {@inheritDoc} */
- @Override public Object call() throws Exception {
- if (log.isDebugEnabled())
- log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
-
-// TODO IGNITE-77: restore adapter usage.
-// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-//
-// IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-// if (!f.isDone())
-// f.get();
-//
-// if (ignoreDepOwnership)
-// cache.context().deploy().ignoreOwnership(true);
-
- IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
- if (skipStore)
- cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
- if (ignoreDepOwnership)
- cache.context().deploy().ignoreOwnership(true);
-
- try {
- updater.update(cache, col);
-
- return null;
- }
- finally {
- if (ignoreDepOwnership)
- cache.context().deploy().ignoreOwnership(false);
-
- if (log.isDebugEnabled())
- log.debug("Update job finished on node: " + ctx.localNodeId());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..1742041
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class IgniteDataStreamerCacheUpdaters {
+ /** */
+ private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
+
+ /** */
+ private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+ /** */
+ private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
+
+ /**
+ * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
+ * is not the best.
+ *
+ * @return Single updater.
+ */
+ public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+ return INDIVIDUAL;
+ }
+
+ /**
+ * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
+ * updated concurrently. Performance is generally better than in {@link #individual()}.
+ *
+ * @return Batched updater.
+ */
+ public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+ return BATCHED;
+ }
+
+ /**
+ * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
+ * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
+ *
+ * @return Batched sorted updater.
+ */
+ public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
+ return BATCHED_SORTED;
+ }
+
+ /**
+ * Updates cache.
+ *
+ * @param cache Cache.
+ * @param rmvCol Keys to remove.
+ * @param putMap Entries to put.
+ * @throws IgniteException If failed.
+ */
+ protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
+ Map<K, V> putMap) {
+ assert rmvCol != null || putMap != null;
+
+ // Here we assume that there are no key duplicates, so the following calls are valid.
+ if (rmvCol != null)
+ ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+ if (putMap != null)
+ cache.putAll(putMap);
+ }
+
+ /**
+ * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
+ */
+ private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ V val = entry.getValue();
+
+ if (val == null)
+ cache.remove(key);
+ else
+ cache.put(key, val);
+ }
+ }
+ }
+
+ /**
+ * Batched updater. Updates cache using batch operations thus is dead lock prone.
+ */
+ private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ Map<K, V> putAll = null;
+ Set<K> rmvAll = null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ V val = entry.getValue();
+
+ if (val == null) {
+ if (rmvAll == null)
+ rmvAll = new HashSet<>();
+
+ rmvAll.add(key);
+ }
+ else {
+ if (putAll == null)
+ putAll = new HashMap<>();
+
+ putAll.put(key, val);
+ }
+ }
+
+ updateAll(cache, rmvAll, putAll);
+ }
+ }
+
+ /**
+ * Batched updater. Updates cache using batch operations thus is dead lock prone.
+ */
+ private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ Map<K, V> putAll = null;
+ Set<K> rmvAll = null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key instanceof Comparable;
+
+ V val = entry.getValue();
+
+ if (val == null) {
+ if (rmvAll == null)
+ rmvAll = new TreeSet<>();
+
+ rmvAll.add(key);
+ }
+ else {
+ if (putAll == null)
+ putAll = new TreeMap<>();
+
+ putAll.put(key, val);
+ }
+ }
+
+ updateAll(cache, rmvAll, putAll);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
index 80d08c8..1231e27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
@@ -302,7 +302,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
if (node == null)
throw new IgniteException("Failed to get node for cache: " + cacheName);
- updater = allow ? GridDataLoadCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+ updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
}
/** {@inheritDoc} */
@@ -450,7 +450,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
assert key != null;
if (initPda) {
- jobPda = new DataLoaderPda(key, entry.getValue(), updater);
+ jobPda = new DataStreamerPda(key, entry.getValue(), updater);
initPda = false;
}
@@ -981,7 +981,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
if (isLocNode) {
fut = ctx.closure().callLocalSafe(
- new GridDataLoadUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
+ new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
locFuts.add(fut);
@@ -1193,7 +1193,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
/**
* Data streamer peer-deploy aware.
*/
- private class DataLoaderPda implements GridPeerDeployAware {
+ private class DataStreamerPda implements GridPeerDeployAware {
/** */
private static final long serialVersionUID = 0L;
@@ -1211,7 +1211,7 @@ public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, D
*
* @param objs Collection of objects to detect deploy class and class loader.
*/
- private DataLoaderPda(Object... objs) {
+ private DataStreamerPda(Object... objs) {
this.objs = Arrays.asList(objs);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
index 69ea440..7db41e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
@@ -248,7 +248,7 @@ public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
return;
}
- GridDataLoadUpdateJob<K, V> job = new GridDataLoadUpdateJob<>(ctx,
+ IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
log,
req.cacheName(),
col,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
new file mode 100644
index 0000000..1a3db40
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.dataload;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
+ /** */
+ private final GridKernalContext ctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Entries to put. */
+ private final Collection<Map.Entry<K, V>> col;
+
+ /** {@code True} to ignore deployment ownership. */
+ private final boolean ignoreDepOwnership;
+
+ /** */
+ private final boolean skipStore;
+
+ /** */
+ private final IgniteDataStreamer.Updater<K, V> updater;
+
+ /**
+ * @param ctx Context.
+ * @param log Log.
+ * @param cacheName Cache name.
+ * @param col Entries to put.
+ * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+ * @param updater Updater.
+ */
+ IgniteDataStreamerUpdateJob(
+ GridKernalContext ctx,
+ IgniteLogger log,
+ @Nullable String cacheName,
+ Collection<Map.Entry<K, V>> col,
+ boolean ignoreDepOwnership,
+ boolean skipStore,
+ IgniteDataStreamer.Updater<K, V> updater) {
+ this.ctx = ctx;
+ this.log = log;
+
+ assert col != null && !col.isEmpty();
+ assert updater != null;
+
+ this.cacheName = cacheName;
+ this.col = col;
+ this.ignoreDepOwnership = ignoreDepOwnership;
+ this.skipStore = skipStore;
+ this.updater = updater;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ if (log.isDebugEnabled())
+ log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
+
+// TODO IGNITE-77: restore adapter usage.
+// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+//
+// IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+// if (!f.isDone())
+// f.get();
+//
+// if (ignoreDepOwnership)
+// cache.context().deploy().ignoreOwnership(true);
+
+ IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
+
+ if (skipStore)
+ cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
+
+ if (ignoreDepOwnership)
+ cache.context().deploy().ignoreOwnership(true);
+
+ try {
+ updater.update(cache, col);
+
+ return null;
+ }
+ finally {
+ if (ignoreDepOwnership)
+ cache.context().deploy().ignoreOwnership(false);
+
+ if (log.isDebugEnabled())
+ log.debug("Update job finished on node: " + ctx.localNodeId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index d585352..15309bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -314,7 +314,7 @@ public class IgfsDataManager extends IgfsManager {
if (cfg.getPerNodeParallelBatchCount() > 0)
ldr.perNodeParallelLoadOperations(cfg.getPerNodeParallelBatchCount());
- ldr.updater(GridDataLoadCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted());
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<IgfsBlockKey, byte[]>batchedSorted());
return ldr;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
index b0d8625..306e615 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
@@ -36,7 +36,7 @@ import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
/**
- * Tests for {@code GridDataLoaderImpl}.
+ * Tests for {@code IgniteDataStreamerImpl}.
*/
public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
/** IP finder. */
@@ -69,7 +69,7 @@ public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testNullPointerExceptionUponDataLoaderClosing() throws Exception {
+ public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
try {
startGrids(5);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
index b3dd71b..22a1f97 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
@@ -140,7 +140,7 @@ public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
ldr.perNodeBufferSize(8192);
- ldr.updater(GridDataLoadCacheUpdaters.<Integer, String>batchedSorted());
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted());
ldr.autoFlushFrequency(0);
final LongAdder cnt = new LongAdder();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f426bb/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
index 8eefebf..23a46e5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
@@ -124,7 +124,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
public void testPartitioned() throws Exception {
mode = PARTITIONED;
- checkDataLoader();
+ checkDataStreamer();
}
/**
@@ -134,7 +134,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
mode = PARTITIONED;
nearEnabled = false;
- checkDataLoader();
+ checkDataStreamer();
}
/**
@@ -143,7 +143,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
public void testReplicated() throws Exception {
mode = REPLICATED;
- checkDataLoader();
+ checkDataStreamer();
}
/**
@@ -153,7 +153,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
mode = LOCAL;
try {
- checkDataLoader();
+ checkDataStreamer();
assert false;
}
@@ -167,7 +167,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@SuppressWarnings("ErrorNotRethrown")
- private void checkDataLoader() throws Exception {
+ private void checkDataStreamer() throws Exception {
try {
Ignite g1 = startGrid(1);
@@ -178,7 +178,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
- ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
final AtomicInteger idxGen = new AtomicInteger();
final int cnt = 400;
@@ -220,7 +220,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
- rmvLdr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted());
+ rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
final CountDownLatch l2 = new CountDownLatch(threads);
@@ -265,7 +265,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
public void testPartitionedIsolated() throws Exception {
mode = PARTITIONED;
- checkIsolatedDataLoader();
+ checkIsolatedDataStreamer();
}
/**
@@ -274,13 +274,13 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
public void testReplicatedIsolated() throws Exception {
mode = REPLICATED;
- checkIsolatedDataLoader();
+ checkIsolatedDataStreamer();
}
/**
* @throws Exception If failed.
*/
- private void checkIsolatedDataLoader() throws Exception {
+ private void checkIsolatedDataStreamer() throws Exception {
try {
useCache = true;
@@ -418,7 +418,7 @@ public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest
// Get and configure loader.
final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
- ldr.updater(GridDataLoadCacheUpdaters.<Integer, Integer>individual());
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
ldr.perNodeBufferSize(2);
// Define count of puts.
[2/5] incubator-ignite git commit: # gg-9869
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
deleted file mode 100644
index 23a46e5..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessorSelfTest.java
+++ /dev/null
@@ -1,924 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cache.store.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jetbrains.annotations.*;
-
-import javax.cache.*;
-import javax.cache.configuration.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- *
- */
-public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest {
- /** */
- private static ConcurrentHashMap<Object, Object> storeMap;
-
- /** */
- private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private CacheMode mode = PARTITIONED;
-
- /** */
- private boolean nearEnabled = true;
-
- /** */
- private boolean useCache;
-
- /** */
- private TestStore store;
-
- /** {@inheritDoc} */
- @Override public void afterTest() throws Exception {
- super.afterTest();
-
- useCache = false;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"IfMayBeConditional", "unchecked"})
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
- spi.setIpFinder(ipFinder);
-
- cfg.setDiscoverySpi(spi);
-
- cfg.setIncludeProperties();
-
- cfg.setMarshaller(new OptimizedMarshaller(false));
-
- if (useCache) {
- CacheConfiguration cc = defaultCacheConfiguration();
-
- cc.setCacheMode(mode);
- cc.setAtomicityMode(TRANSACTIONAL);
- cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
- cc.setWriteSynchronizationMode(FULL_SYNC);
-
- cc.setEvictSynchronized(false);
- cc.setEvictNearSynchronized(false);
-
- if (store != null) {
- cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
- cc.setReadThrough(true);
- cc.setWriteThrough(true);
- }
-
- cfg.setCacheConfiguration(cc);
- }
- else
- cfg.setCacheConfiguration();
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitioned() throws Exception {
- mode = PARTITIONED;
-
- checkDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testColocated() throws Exception {
- mode = PARTITIONED;
- nearEnabled = false;
-
- checkDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicated() throws Exception {
- mode = REPLICATED;
-
- checkDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLocal() throws Exception {
- mode = LOCAL;
-
- try {
- checkDataStreamer();
-
- assert false;
- }
- catch (IgniteCheckedException e) {
- // Cannot load local cache configured remotely.
- info("Caught expected exception: " + e);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- @SuppressWarnings("ErrorNotRethrown")
- private void checkDataStreamer() throws Exception {
- try {
- Ignite g1 = startGrid(1);
-
- useCache = true;
-
- Ignite g2 = startGrid(2);
- startGrid(3);
-
- final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
- ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
- final AtomicInteger idxGen = new AtomicInteger();
- final int cnt = 400;
- final int threads = 10;
-
- final CountDownLatch l1 = new CountDownLatch(threads);
-
- IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- int idx = idxGen.getAndIncrement();
-
- futs.add(ldr.addData(idx, idx));
- }
-
- l1.countDown();
-
- for (IgniteFuture<?> fut : futs)
- fut.get();
-
- return null;
- }
- }, threads);
-
- l1.await();
-
- // This will wait until data streamer finishes loading.
- stopGrid(getTestGridName(1), false);
-
- f1.get();
-
- int s2 = internalCache(2).primaryKeySet().size();
- int s3 = internalCache(3).primaryKeySet().size();
- int total = threads * cnt;
-
- assertEquals(total, s2 + s3);
-
- final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
-
- rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
-
- final CountDownLatch l2 = new CountDownLatch(threads);
-
- IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- final int key = idxGen.decrementAndGet();
-
- futs.add(rmvLdr.removeData(key));
- }
-
- l2.countDown();
-
- for (IgniteFuture<?> fut : futs)
- fut.get();
-
- return null;
- }
- }, threads);
-
- l2.await();
-
- rmvLdr.close(false);
-
- f2.get();
-
- s2 = internalCache(2).primaryKeySet().size();
- s3 = internalCache(3).primaryKeySet().size();
-
- assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionedIsolated() throws Exception {
- mode = PARTITIONED;
-
- checkIsolatedDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicatedIsolated() throws Exception {
- mode = REPLICATED;
-
- checkIsolatedDataStreamer();
- }
-
- /**
- * @throws Exception If failed.
- */
- private void checkIsolatedDataStreamer() throws Exception {
- try {
- useCache = true;
-
- Ignite g1 = startGrid(0);
- startGrid(1);
- startGrid(2);
-
- awaitPartitionMapExchange();
-
- GridCache<Integer, Integer> cache = ((IgniteKernal)grid(0)).cache(null);
-
- for (int i = 0; i < 100; i++)
- cache.put(i, -1);
-
- final int cnt = 40_000;
- final int threads = 10;
-
- try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) {
- final AtomicInteger idxGen = new AtomicInteger();
-
- IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- for (int i = 0; i < cnt; i++) {
- int idx = idxGen.getAndIncrement();
-
- ldr.addData(idx, idx);
- }
-
- return null;
- }
- }, threads);
-
- f1.get();
- }
-
- for (int g = 0; g < 3; g++) {
- ClusterNode locNode = grid(g).localNode();
-
- GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null);
-
- if (cache0.isNear())
- cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht();
-
- CacheAffinity<Integer> aff = cache0.affinity();
-
- for (int key = 0; key < cnt * threads; key++) {
- if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) {
- GridCacheEntryEx<Integer, Integer> entry = cache0.peekEx(key);
-
- assertNotNull("Missing entry for key: " + key, entry);
- assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false));
- }
- }
- }
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * Test primitive arrays can be passed into data streamer.
- *
- * @throws Exception If failed.
- */
- public void testPrimitiveArrays() throws Exception {
- try {
- useCache = true;
- mode = PARTITIONED;
-
- Ignite g1 = startGrid(1);
- startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
-
- List<Object> arrays = Arrays.<Object>asList(
- new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
- new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
-
- IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
-
- for (int i = 0, size = arrays.size(); i < 1000; i++) {
- Object arr = arrays.get(i % size);
-
- dataLdr.addData(i, arr);
- dataLdr.addData(i, fixedClosure(arr));
- }
-
- dataLdr.close(false);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testReplicatedMultiThreaded() throws Exception {
- mode = REPLICATED;
-
- checkLoaderMultithreaded(1, 2);
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPartitionedMultiThreaded() throws Exception {
- mode = PARTITIONED;
-
- checkLoaderMultithreaded(1, 3);
- }
-
- /**
- * Tests loader in multithreaded environment with various count of grids started.
- *
- * @param nodesCntNoCache How many nodes should be started without cache.
- * @param nodesCntCache How many nodes should be started with cache.
- * @throws Exception If failed.
- */
- protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
- throws Exception {
- try {
- // Start all required nodes.
- int idx = 1;
-
- for (int i = 0; i < nodesCntNoCache; i++)
- startGrid(idx++);
-
- useCache = true;
-
- for (int i = 0; i < nodesCntCache; i++)
- startGrid(idx++);
-
- Ignite g1 = grid(1);
-
- // Get and configure loader.
- final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
-
- ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
- ldr.perNodeBufferSize(2);
-
- // Define count of puts.
- final AtomicInteger idxGen = new AtomicInteger();
-
- final AtomicBoolean done = new AtomicBoolean();
-
- try {
- final int totalPutCnt = 50000;
-
- IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- Collection<IgniteFuture<?>> futs = new ArrayList<>();
-
- while (!done.get()) {
- int idx = idxGen.getAndIncrement();
-
- if (idx >= totalPutCnt) {
- info(">>> Stopping producer thread since maximum count of puts reached.");
-
- break;
- }
-
- futs.add(ldr.addData(idx, idx));
- }
-
- ldr.flush();
-
- for (IgniteFuture<?> fut : futs)
- fut.get();
-
- return null;
- }
- }, 5, "producer");
-
- IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- while (!done.get()) {
- ldr.flush();
-
- U.sleep(100);
- }
-
- return null;
- }
- }, 1, "flusher");
-
- // Define index of node being restarted.
- final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
-
- IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- try {
- for (int i = 0; i < 5; i++) {
- Ignite g = startGrid(restartNodeIdx);
-
- UUID id = g.cluster().localNode().id();
-
- info(">>>>>>> Started node: " + id);
-
- U.sleep(1000);
-
- stopGrid(getTestGridName(restartNodeIdx), true);
-
- info(">>>>>>> Stopped node: " + id);
- }
- }
- finally {
- done.set(true);
-
- info("Start stop thread finished.");
- }
-
- return null;
- }
- }, 1, "start-stop-thread");
-
- fut1.get();
- fut2.get();
- fut3.get();
- }
- finally {
- ldr.close(false);
- }
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testLoaderApi() throws Exception {
- useCache = true;
-
- try {
- Ignite g1 = startGrid(1);
-
- IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
-
- ldr.close(false);
-
- try {
- ldr.addData(0, 0);
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- assert ldr.future().isDone();
-
- ldr.future().get();
-
- try {
- // Create another loader.
- ldr = g1.dataStreamer("UNKNOWN_CACHE");
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- ldr.close(true);
-
- assert ldr.future().isDone();
-
- ldr.future().get();
-
- // Create another loader.
- ldr = g1.dataStreamer(null);
-
- // Cancel with future.
- ldr.future().cancel();
-
- try {
- ldr.addData(0, 0);
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- assert ldr.future().isDone();
-
- try {
- ldr.future().get();
-
- assert false;
- }
- catch (IgniteFutureCancelledException e) {
- info("Caught expected exception: " + e);
- }
-
- // Create another loader.
- ldr = g1.dataStreamer(null);
-
- // This will close loader.
- stopGrid(getTestGridName(1), false);
-
- try {
- ldr.addData(0, 0);
-
- assert false;
- }
- catch (IllegalStateException e) {
- info("Caught expected exception: " + e);
- }
-
- assert ldr.future().isDone();
-
- ldr.future().get();
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * Wraps integer to closure returning it.
- *
- * @param i Value to wrap.
- * @return Callable.
- */
- private static Callable<Integer> callable(@Nullable final Integer i) {
- return new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return i;
- }
- };
- }
-
- /**
- * Wraps integer to closure returning it.
- *
- * @param i Value to wrap.
- * @return Closure.
- */
- private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
- return new IgniteClosure<Integer, Integer>() {
- @Override public Integer apply(Integer e) {
- return e == null ? i : e + i;
- }
- };
- }
-
- /**
- * Wraps object to closure returning it.
- *
- * @param obj Value to wrap.
- * @return Closure.
- */
- private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
- return new IgniteClosure<T, T>() {
- @Override public T apply(T e) {
- assert e == null || obj == null || e.getClass() == obj.getClass() :
- "Expects the same types [e=" + e + ", obj=" + obj + ']';
-
- return obj;
- }
- };
- }
-
- /**
- * Wraps integer to closure expecting it and returning {@code null}.
- *
- * @param exp Expected closure value.
- * @return Remove expected cache value closure.
- */
- private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
- return new IgniteClosure<T, T>() {
- @Override public T apply(T act) {
- if (exp == null ? act == null : exp.equals(act))
- return null;
-
- throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
- }
- };
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFlush() throws Exception {
- mode = LOCAL;
-
- useCache = true;
-
- try {
- Ignite g = startGrid();
-
- final IgniteCache<Integer, Integer> c = g.jcache(null);
-
- final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
- ldr.perNodeBufferSize(10);
-
- for (int i = 0; i < 9; i++)
- ldr.addData(i, i);
-
- assertTrue(c.localSize() == 0);
-
- multithreaded(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- ldr.flush();
-
- assertEquals(9, c.size());
-
- return null;
- }
- }, 5, "flush-checker");
-
- ldr.addData(100, 100);
-
- ldr.flush();
-
- assertEquals(10, c.size());
-
- ldr.addData(200, 200);
-
- ldr.close(false);
-
- ldr.future().get();
-
- assertEquals(11, c.size());
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testTryFlush() throws Exception {
- mode = LOCAL;
-
- useCache = true;
-
- try {
- Ignite g = startGrid();
-
- IgniteCache<Integer, Integer> c = g.jcache(null);
-
- IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
- ldr.perNodeBufferSize(10);
-
- for (int i = 0; i < 9; i++)
- ldr.addData(i, i);
-
- assertTrue(c.localSize() == 0);
-
- ldr.tryFlush();
-
- Thread.sleep(100);
-
- assertEquals(9, c.size());
-
- ldr.close(false);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testFlushTimeout() throws Exception {
- mode = LOCAL;
-
- useCache = true;
-
- try {
- Ignite g = startGrid();
-
- final CountDownLatch latch = new CountDownLatch(9);
-
- g.events().localListen(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- latch.countDown();
-
- return true;
- }
- }, EVT_CACHE_OBJECT_PUT);
-
- IgniteCache<Integer, Integer> c = g.jcache(null);
-
- assertTrue(c.localSize() == 0);
-
- IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
-
- ldr.perNodeBufferSize(10);
- ldr.autoFlushFrequency(3000);
- ldr.allowOverwrite(true);
-
- for (int i = 0; i < 9; i++)
- ldr.addData(i, i);
-
- assertTrue(c.localSize() == 0);
-
- assertFalse(latch.await(1000, MILLISECONDS));
-
- assertTrue(c.localSize() == 0);
-
- assertTrue(latch.await(3000, MILLISECONDS));
-
- assertEquals(9, c.size());
-
- ldr.close(false);
- }
- finally {
- stopAllGrids();
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testUpdateStore() throws Exception {
- storeMap = new ConcurrentHashMap<>();
-
- try {
- store = new TestStore();
-
- useCache = true;
-
- Ignite ignite = startGrid(1);
-
- startGrid(2);
- startGrid(3);
-
- for (int i = 0; i < 1000; i++)
- storeMap.put(i, i);
-
- try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
- ldr.allowOverwrite(true);
-
- assertFalse(ldr.skipStore());
-
- for (int i = 0; i < 1000; i++)
- ldr.removeData(i);
-
- for (int i = 1000; i < 2000; i++)
- ldr.addData(i, i);
- }
-
- for (int i = 0; i < 1000; i++)
- assertNull(storeMap.get(i));
-
- for (int i = 1000; i < 2000; i++)
- assertEquals(i, storeMap.get(i));
-
- try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
- ldr.allowOverwrite(true);
-
- ldr.skipStore(true);
-
- for (int i = 0; i < 1000; i++)
- ldr.addData(i, i);
-
- for (int i = 1000; i < 2000; i++)
- ldr.removeData(i);
- }
-
- IgniteCache<Object, Object> cache = ignite.jcache(null);
-
- for (int i = 0; i < 1000; i++) {
- assertNull(storeMap.get(i));
-
- assertEquals(i, cache.get(i));
- }
-
- for (int i = 1000; i < 2000; i++) {
- assertEquals(i, storeMap.get(i));
-
- assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
- }
- }
- finally {
- storeMap = null;
- }
- }
-
- /**
- *
- */
- private static class TestObject {
- /** Value. */
- private final int val;
-
- /**
- * @param val Value.
- */
- private TestObject(int val) {
- this.val = val;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- TestObject obj = (TestObject)o;
-
- return val == obj.val;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return val;
- }
- }
-
- /**
- *
- */
- private static class TestStore extends CacheStoreAdapter<Object, Object> {
- /** {@inheritDoc} */
- @Nullable @Override public Object load(Object key) {
- return storeMap.get(key);
- }
-
- /** {@inheritDoc} */
- @Override public void write(Cache.Entry<?, ?> entry) {
- storeMap.put(entry.getKey(), entry.getValue());
- }
-
- /** {@inheritDoc} */
- @Override public void delete(Object key) {
- storeMap.remove(key);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
new file mode 100644
index 0000000..63b2c248
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImplSelfTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ * Tests for {@code IgniteDataStreamerImpl}.
+ */
+public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Number of keys to load via data streamer. */
+ private static final int KEYS_COUNT = 1000;
+
+ /** Started grid counter. */
+ private static int cnt;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ // Forth node goes without cache.
+ if (cnt < 4)
+ cfg.setCacheConfiguration(cacheConfiguration());
+
+ cnt++;
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
+ try {
+ startGrids(5);
+
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+
+ multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ U.awaitQuiet(barrier);
+
+ G.stopAll(true);
+
+ return null;
+ }
+ }, 1);
+
+ Ignite g4 = grid(4);
+
+ IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
+
+ dataLdr.perNodeBufferSize(32);
+
+ for (int i = 0; i < 100000; i += 2) {
+ dataLdr.addData(i, i);
+ dataLdr.removeData(i + 1);
+ }
+
+ U.awaitQuiet(barrier);
+
+ info("Closing data streamer.");
+
+ try {
+ dataLdr.close(true);
+ }
+ catch (IllegalStateException ignore) {
+ // This is ok to ignore this exception as test is racy by it's nature -
+ // grid is stopping in different thread.
+ }
+ }
+ finally {
+ G.stopAll(true);
+ }
+ }
+
+ /**
+ * Data streamer should correctly load entries from HashMap in case of grids with more than one node
+ * and with GridOptimizedMarshaller that requires serializable.
+ *
+ * @throws Exception If failed.
+ */
+ public void testAddDataFromMap() throws Exception {
+ try {
+ cnt = 0;
+
+ startGrids(2);
+
+ Ignite g0 = grid(0);
+
+ Marshaller marsh = g0.configuration().getMarshaller();
+
+ if (marsh instanceof OptimizedMarshaller)
+ assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable());
+ else
+ fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
+
+ IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
+
+ Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
+
+ for (int i = 0; i < KEYS_COUNT; i ++)
+ map.put(i, String.valueOf(i));
+
+ dataLdr.addData(map);
+
+ dataLdr.close();
+
+ Random rnd = new Random();
+
+ IgniteCache<Integer, String> c = g0.jcache(null);
+
+ for (int i = 0; i < KEYS_COUNT; i ++) {
+ Integer k = rnd.nextInt(KEYS_COUNT);
+
+ String v = c.get(k);
+
+ assertEquals(k.toString(), v);
+ }
+ }
+ finally {
+ G.stopAll(true);
+ }
+ }
+
+ /**
+ * Gets cache configuration.
+ *
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration() {
+ CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+ cacheCfg.setBackups(1);
+ cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ return cacheCfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestObject implements Serializable {
+ /** */
+ private int val;
+
+ /**
+ */
+ private TestObject() {
+ // No-op.
+ }
+
+ /**
+ * @param val Value.
+ */
+ private TestObject(int val) {
+ this.val = val;
+ }
+
+ public Integer val() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof TestObject && ((TestObject)obj).val == val;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
new file mode 100644
index 0000000..33fe310
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerPerformanceTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jdk8.backport.*;
+
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Data streamer performance test. Compares group lock data streamer to traditional lock.
+ * <p>
+ * Disable assertions and give at least 2 GB heap to run this test.
+ */
+public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int GRID_CNT = 3;
+
+ /** */
+ private static final int ENTRY_CNT = 80000;
+
+ /** */
+ private boolean useCache;
+
+ /** */
+ private String[] vals = new String[2048];
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(spi);
+
+ cfg.setIncludeProperties();
+
+ cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
+
+ cfg.setConnectorConfiguration(null);
+
+ cfg.setPeerClassLoadingEnabled(true);
+
+ if (useCache) {
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setCacheMode(PARTITIONED);
+
+ cc.setDistributionMode(PARTITIONED_ONLY);
+ cc.setWriteSynchronizationMode(FULL_SYNC);
+ cc.setStartSize(ENTRY_CNT / GRID_CNT);
+ cc.setSwapEnabled(false);
+
+ cc.setBackups(1);
+
+ cc.setStoreValueBytes(true);
+
+ cfg.setCacheSanityCheckEnabled(false);
+ cfg.setCacheConfiguration(cc);
+ }
+ else
+ cfg.setCacheConfiguration();
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ for (int i = 0; i < vals.length; i++) {
+ int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
+
+ StringBuilder sb = new StringBuilder();
+
+ for (int j = 0; j < valLen; j++)
+ sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
+
+ vals[i] = sb.toString();
+
+ info("Value: " + vals[i]);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPerformance() throws Exception {
+ doTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void doTest() throws Exception {
+ System.gc();
+ System.gc();
+ System.gc();
+
+ try {
+ useCache = true;
+
+ startGridsMultiThreaded(GRID_CNT);
+
+ useCache = false;
+
+ Ignite ignite = startGrid();
+
+ final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
+
+ ldr.perNodeBufferSize(8192);
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted());
+ ldr.autoFlushFrequency(0);
+
+ final LongAdder cnt = new LongAdder();
+
+ long start = U.currentTimeMillis();
+
+ Thread t = new Thread(new Runnable() {
+ @SuppressWarnings("BusyWait")
+ @Override public void run() {
+ while (true) {
+ try {
+ Thread.sleep(10000);
+ }
+ catch (InterruptedException ignored) {
+ break;
+ }
+
+ info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
+ }
+ }
+ });
+
+ t.setDaemon(true);
+
+ t.start();
+
+ int threadNum = 2;//Runtime.getRuntime().availableProcessors();
+
+ multithreaded(new Callable<Object>() {
+ @SuppressWarnings("InfiniteLoopStatement")
+ @Override public Object call() throws Exception {
+ ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
+
+ while (true) {
+ int i = rnd.nextInt(ENTRY_CNT);
+
+ ldr.addData(i, vals[rnd.nextInt(vals.length)]);
+
+ cnt.increment();
+ }
+ }
+ }, threadNum, "loader");
+
+ info("Closing loader...");
+
+ ldr.close(false);
+
+ long duration = U.currentTimeMillis() - start;
+
+ info("Finished performance test. Duration: " + duration + "ms.");
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
new file mode 100644
index 0000000..9402c0c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessorSelfTest.java
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheDistributionMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteDataStreamerProcessorSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static ConcurrentHashMap<Object, Object> storeMap;
+
+ /** */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private CacheMode mode = PARTITIONED;
+
+ /** */
+ private boolean nearEnabled = true;
+
+ /** */
+ private boolean useCache;
+
+ /** */
+ private TestStore store;
+
+ /** {@inheritDoc} */
+ @Override public void afterTest() throws Exception {
+ super.afterTest();
+
+ useCache = false;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"IfMayBeConditional", "unchecked"})
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(spi);
+
+ cfg.setIncludeProperties();
+
+ cfg.setMarshaller(new OptimizedMarshaller(false));
+
+ if (useCache) {
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setCacheMode(mode);
+ cc.setAtomicityMode(TRANSACTIONAL);
+ cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY);
+ cc.setWriteSynchronizationMode(FULL_SYNC);
+
+ cc.setEvictSynchronized(false);
+ cc.setEvictNearSynchronized(false);
+
+ if (store != null) {
+ cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+ cc.setReadThrough(true);
+ cc.setWriteThrough(true);
+ }
+
+ cfg.setCacheConfiguration(cc);
+ }
+ else
+ cfg.setCacheConfiguration();
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitioned() throws Exception {
+ mode = PARTITIONED;
+
+ checkDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testColocated() throws Exception {
+ mode = PARTITIONED;
+ nearEnabled = false;
+
+ checkDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicated() throws Exception {
+ mode = REPLICATED;
+
+ checkDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLocal() throws Exception {
+ mode = LOCAL;
+
+ try {
+ checkDataStreamer();
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ // Cannot load local cache configured remotely.
+ info("Caught expected exception: " + e);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ErrorNotRethrown")
+ private void checkDataStreamer() throws Exception {
+ try {
+ Ignite g1 = startGrid(1);
+
+ useCache = true;
+
+ Ignite g2 = startGrid(2);
+ startGrid(3);
+
+ final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
+
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
+
+ final AtomicInteger idxGen = new AtomicInteger();
+ final int cnt = 400;
+ final int threads = 10;
+
+ final CountDownLatch l1 = new CountDownLatch(threads);
+
+ IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ int idx = idxGen.getAndIncrement();
+
+ futs.add(ldr.addData(idx, idx));
+ }
+
+ l1.countDown();
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ return null;
+ }
+ }, threads);
+
+ l1.await();
+
+ // This will wait until data streamer finishes loading.
+ stopGrid(getTestGridName(1), false);
+
+ f1.get();
+
+ int s2 = internalCache(2).primaryKeySet().size();
+ int s3 = internalCache(3).primaryKeySet().size();
+ int total = threads * cnt;
+
+ assertEquals(total, s2 + s3);
+
+ final IgniteDataStreamer<Integer, Integer> rmvLdr = g2.dataStreamer(null);
+
+ rmvLdr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>batchedSorted());
+
+ final CountDownLatch l2 = new CountDownLatch(threads);
+
+ IgniteInternalFuture<?> f2 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ final int key = idxGen.decrementAndGet();
+
+ futs.add(rmvLdr.removeData(key));
+ }
+
+ l2.countDown();
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ return null;
+ }
+ }, threads);
+
+ l2.await();
+
+ rmvLdr.close(false);
+
+ f2.get();
+
+ s2 = internalCache(2).primaryKeySet().size();
+ s3 = internalCache(3).primaryKeySet().size();
+
+ assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']';
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedIsolated() throws Exception {
+ mode = PARTITIONED;
+
+ checkIsolatedDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedIsolated() throws Exception {
+ mode = REPLICATED;
+
+ checkIsolatedDataStreamer();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void checkIsolatedDataStreamer() throws Exception {
+ try {
+ useCache = true;
+
+ Ignite g1 = startGrid(0);
+ startGrid(1);
+ startGrid(2);
+
+ awaitPartitionMapExchange();
+
+ GridCache<Integer, Integer> cache = ((IgniteKernal)grid(0)).cache(null);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, -1);
+
+ final int cnt = 40_000;
+ final int threads = 10;
+
+ try (final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null)) {
+ final AtomicInteger idxGen = new AtomicInteger();
+
+ IgniteInternalFuture<?> f1 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ int idx = idxGen.getAndIncrement();
+
+ ldr.addData(idx, idx);
+ }
+
+ return null;
+ }
+ }, threads);
+
+ f1.get();
+ }
+
+ for (int g = 0; g < 3; g++) {
+ ClusterNode locNode = grid(g).localNode();
+
+ GridCacheAdapter<Integer, Integer> cache0 = ((IgniteKernal)grid(g)).internalCache(null);
+
+ if (cache0.isNear())
+ cache0 = ((GridNearCacheAdapter<Integer, Integer>)cache0).dht();
+
+ CacheAffinity<Integer> aff = cache0.affinity();
+
+ for (int key = 0; key < cnt * threads; key++) {
+ if (aff.isPrimary(locNode, key) || aff.isBackup(locNode, key)) {
+ GridCacheEntryEx<Integer, Integer> entry = cache0.peekEx(key);
+
+ assertNotNull("Missing entry for key: " + key, entry);
+ assertEquals((Integer)(key < 100 ? -1 : key), entry.rawGetOrUnmarshal(false));
+ }
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Test primitive arrays can be passed into data streamer.
+ *
+ * @throws Exception If failed.
+ */
+ public void testPrimitiveArrays() throws Exception {
+ try {
+ useCache = true;
+ mode = PARTITIONED;
+
+ Ignite g1 = startGrid(1);
+ startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used).
+
+ List<Object> arrays = Arrays.<Object>asList(
+ new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4},
+ new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8});
+
+ IgniteDataStreamer<Object, Object> dataLdr = g1.dataStreamer(null);
+
+ for (int i = 0, size = arrays.size(); i < 1000; i++) {
+ Object arr = arrays.get(i % size);
+
+ dataLdr.addData(i, arr);
+ dataLdr.addData(i, fixedClosure(arr));
+ }
+
+ dataLdr.close(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReplicatedMultiThreaded() throws Exception {
+ mode = REPLICATED;
+
+ checkLoaderMultithreaded(1, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPartitionedMultiThreaded() throws Exception {
+ mode = PARTITIONED;
+
+ checkLoaderMultithreaded(1, 3);
+ }
+
+ /**
+ * Tests loader in multithreaded environment with various count of grids started.
+ *
+ * @param nodesCntNoCache How many nodes should be started without cache.
+ * @param nodesCntCache How many nodes should be started with cache.
+ * @throws Exception If failed.
+ */
+ protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache)
+ throws Exception {
+ try {
+ // Start all required nodes.
+ int idx = 1;
+
+ for (int i = 0; i < nodesCntNoCache; i++)
+ startGrid(idx++);
+
+ useCache = true;
+
+ for (int i = 0; i < nodesCntCache; i++)
+ startGrid(idx++);
+
+ Ignite g1 = grid(1);
+
+ // Get and configure loader.
+ final IgniteDataStreamer<Integer, Integer> ldr = g1.dataStreamer(null);
+
+ ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, Integer>individual());
+ ldr.perNodeBufferSize(2);
+
+ // Define count of puts.
+ final AtomicInteger idxGen = new AtomicInteger();
+
+ final AtomicBoolean done = new AtomicBoolean();
+
+ try {
+ final int totalPutCnt = 50000;
+
+ IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ Collection<IgniteFuture<?>> futs = new ArrayList<>();
+
+ while (!done.get()) {
+ int idx = idxGen.getAndIncrement();
+
+ if (idx >= totalPutCnt) {
+ info(">>> Stopping producer thread since maximum count of puts reached.");
+
+ break;
+ }
+
+ futs.add(ldr.addData(idx, idx));
+ }
+
+ ldr.flush();
+
+ for (IgniteFuture<?> fut : futs)
+ fut.get();
+
+ return null;
+ }
+ }, 5, "producer");
+
+ IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!done.get()) {
+ ldr.flush();
+
+ U.sleep(100);
+ }
+
+ return null;
+ }
+ }, 1, "flusher");
+
+ // Define index of node being restarted.
+ final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1;
+
+ IgniteInternalFuture<?> fut3 = multithreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ for (int i = 0; i < 5; i++) {
+ Ignite g = startGrid(restartNodeIdx);
+
+ UUID id = g.cluster().localNode().id();
+
+ info(">>>>>>> Started node: " + id);
+
+ U.sleep(1000);
+
+ stopGrid(getTestGridName(restartNodeIdx), true);
+
+ info(">>>>>>> Stopped node: " + id);
+ }
+ }
+ finally {
+ done.set(true);
+
+ info("Start stop thread finished.");
+ }
+
+ return null;
+ }
+ }, 1, "start-stop-thread");
+
+ fut1.get();
+ fut2.get();
+ fut3.get();
+ }
+ finally {
+ ldr.close(false);
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testLoaderApi() throws Exception {
+ useCache = true;
+
+ try {
+ Ignite g1 = startGrid(1);
+
+ IgniteDataStreamer<Object, Object> ldr = g1.dataStreamer(null);
+
+ ldr.close(false);
+
+ try {
+ ldr.addData(0, 0);
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ assert ldr.future().isDone();
+
+ ldr.future().get();
+
+ try {
+ // Create another loader.
+ ldr = g1.dataStreamer("UNKNOWN_CACHE");
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ ldr.close(true);
+
+ assert ldr.future().isDone();
+
+ ldr.future().get();
+
+ // Create another loader.
+ ldr = g1.dataStreamer(null);
+
+ // Cancel with future.
+ ldr.future().cancel();
+
+ try {
+ ldr.addData(0, 0);
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ assert ldr.future().isDone();
+
+ try {
+ ldr.future().get();
+
+ assert false;
+ }
+ catch (IgniteFutureCancelledException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ // Create another loader.
+ ldr = g1.dataStreamer(null);
+
+ // This will close loader.
+ stopGrid(getTestGridName(1), false);
+
+ try {
+ ldr.addData(0, 0);
+
+ assert false;
+ }
+ catch (IllegalStateException e) {
+ info("Caught expected exception: " + e);
+ }
+
+ assert ldr.future().isDone();
+
+ ldr.future().get();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * Wraps integer to closure returning it.
+ *
+ * @param i Value to wrap.
+ * @return Callable.
+ */
+ private static Callable<Integer> callable(@Nullable final Integer i) {
+ return new Callable<Integer>() {
+ @Override public Integer call() throws Exception {
+ return i;
+ }
+ };
+ }
+
+ /**
+ * Wraps integer to closure returning it.
+ *
+ * @param i Value to wrap.
+ * @return Closure.
+ */
+ private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) {
+ return new IgniteClosure<Integer, Integer>() {
+ @Override public Integer apply(Integer e) {
+ return e == null ? i : e + i;
+ }
+ };
+ }
+
+ /**
+ * Wraps object to closure returning it.
+ *
+ * @param obj Value to wrap.
+ * @return Closure.
+ */
+ private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) {
+ return new IgniteClosure<T, T>() {
+ @Override public T apply(T e) {
+ assert e == null || obj == null || e.getClass() == obj.getClass() :
+ "Expects the same types [e=" + e + ", obj=" + obj + ']';
+
+ return obj;
+ }
+ };
+ }
+
+ /**
+ * Wraps integer to closure expecting it and returning {@code null}.
+ *
+ * @param exp Expected closure value.
+ * @return Remove expected cache value closure.
+ */
+ private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) {
+ return new IgniteClosure<T, T>() {
+ @Override public T apply(T act) {
+ if (exp == null ? act == null : exp.equals(act))
+ return null;
+
+ throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']');
+ }
+ };
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFlush() throws Exception {
+ mode = LOCAL;
+
+ useCache = true;
+
+ try {
+ Ignite g = startGrid();
+
+ final IgniteCache<Integer, Integer> c = g.jcache(null);
+
+ final IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+ ldr.perNodeBufferSize(10);
+
+ for (int i = 0; i < 9; i++)
+ ldr.addData(i, i);
+
+ assertTrue(c.localSize() == 0);
+
+ multithreaded(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ ldr.flush();
+
+ assertEquals(9, c.size());
+
+ return null;
+ }
+ }, 5, "flush-checker");
+
+ ldr.addData(100, 100);
+
+ ldr.flush();
+
+ assertEquals(10, c.size());
+
+ ldr.addData(200, 200);
+
+ ldr.close(false);
+
+ ldr.future().get();
+
+ assertEquals(11, c.size());
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTryFlush() throws Exception {
+ mode = LOCAL;
+
+ useCache = true;
+
+ try {
+ Ignite g = startGrid();
+
+ IgniteCache<Integer, Integer> c = g.jcache(null);
+
+ IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+ ldr.perNodeBufferSize(10);
+
+ for (int i = 0; i < 9; i++)
+ ldr.addData(i, i);
+
+ assertTrue(c.localSize() == 0);
+
+ ldr.tryFlush();
+
+ Thread.sleep(100);
+
+ assertEquals(9, c.size());
+
+ ldr.close(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFlushTimeout() throws Exception {
+ mode = LOCAL;
+
+ useCache = true;
+
+ try {
+ Ignite g = startGrid();
+
+ final CountDownLatch latch = new CountDownLatch(9);
+
+ g.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ latch.countDown();
+
+ return true;
+ }
+ }, EVT_CACHE_OBJECT_PUT);
+
+ IgniteCache<Integer, Integer> c = g.jcache(null);
+
+ assertTrue(c.localSize() == 0);
+
+ IgniteDataStreamer<Integer, Integer> ldr = g.dataStreamer(null);
+
+ ldr.perNodeBufferSize(10);
+ ldr.autoFlushFrequency(3000);
+ ldr.allowOverwrite(true);
+
+ for (int i = 0; i < 9; i++)
+ ldr.addData(i, i);
+
+ assertTrue(c.localSize() == 0);
+
+ assertFalse(latch.await(1000, MILLISECONDS));
+
+ assertTrue(c.localSize() == 0);
+
+ assertTrue(latch.await(3000, MILLISECONDS));
+
+ assertEquals(9, c.size());
+
+ ldr.close(false);
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testUpdateStore() throws Exception {
+ storeMap = new ConcurrentHashMap<>();
+
+ try {
+ store = new TestStore();
+
+ useCache = true;
+
+ Ignite ignite = startGrid(1);
+
+ startGrid(2);
+ startGrid(3);
+
+ for (int i = 0; i < 1000; i++)
+ storeMap.put(i, i);
+
+ try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
+ ldr.allowOverwrite(true);
+
+ assertFalse(ldr.skipStore());
+
+ for (int i = 0; i < 1000; i++)
+ ldr.removeData(i);
+
+ for (int i = 1000; i < 2000; i++)
+ ldr.addData(i, i);
+ }
+
+ for (int i = 0; i < 1000; i++)
+ assertNull(storeMap.get(i));
+
+ for (int i = 1000; i < 2000; i++)
+ assertEquals(i, storeMap.get(i));
+
+ try (IgniteDataStreamer<Object, Object> ldr = ignite.dataStreamer(null)) {
+ ldr.allowOverwrite(true);
+
+ ldr.skipStore(true);
+
+ for (int i = 0; i < 1000; i++)
+ ldr.addData(i, i);
+
+ for (int i = 1000; i < 2000; i++)
+ ldr.removeData(i);
+ }
+
+ IgniteCache<Object, Object> cache = ignite.jcache(null);
+
+ for (int i = 0; i < 1000; i++) {
+ assertNull(storeMap.get(i));
+
+ assertEquals(i, cache.get(i));
+ }
+
+ for (int i = 1000; i < 2000; i++) {
+ assertEquals(i, storeMap.get(i));
+
+ assertNull(cache.localPeek(i, CachePeekMode.ONHEAP));
+ }
+ }
+ finally {
+ storeMap = null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestObject {
+ /** Value. */
+ private final int val;
+
+ /**
+ * @param val Value.
+ */
+ private TestObject(int val) {
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TestObject obj = (TestObject)o;
+
+ return val == obj.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestStore extends CacheStoreAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Object load(Object key) {
+ return storeMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry<?, ?> entry) {
+ storeMap.put(entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ storeMap.remove(key);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 30285a8..aea55df 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -33,7 +33,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.preloa
import org.apache.ignite.internal.processors.cache.expiry.*;
import org.apache.ignite.internal.processors.cache.integration.*;
import org.apache.ignite.internal.processors.cache.local.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
/**
* Test suite.
[5/5] incubator-ignite git commit: # gg-9869
Posted by sb...@apache.org.
# gg-9869
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9c8217c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9c8217c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9c8217c1
Branch: refs/heads/ignite-394
Commit: 9c8217c17851487f783bf4fc05d75b4d2996c251
Parents: 96f426b
Author: Artem Shutak <as...@gridgain.com>
Authored: Wed Mar 4 15:27:48 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Wed Mar 4 15:27:48 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalContext.java | 2 +-
.../ignite/internal/GridKernalContextImpl.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../communication/GridIoMessageFactory.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 2 +-
.../GridDistributedCacheAdapter.java | 2 +-
.../dataload/GridDataLoadRequest.java | 450 ------
.../dataload/GridDataLoadResponse.java | 166 --
.../IgniteDataStreamerCacheUpdaters.java | 199 ---
.../dataload/IgniteDataStreamerFuture.java | 75 -
.../dataload/IgniteDataStreamerImpl.java | 1453 ------------------
.../dataload/IgniteDataStreamerProcessor.java | 316 ----
.../dataload/IgniteDataStreamerUpdateJob.java | 119 --
.../internal/processors/dataload/package.html | 24 -
.../datastream/GridDataLoadRequest.java | 450 ++++++
.../datastream/GridDataLoadResponse.java | 166 ++
.../IgniteDataStreamerCacheUpdaters.java | 199 +++
.../datastream/IgniteDataStreamerFuture.java | 75 +
.../datastream/IgniteDataStreamerImpl.java | 1453 ++++++++++++++++++
.../datastream/IgniteDataStreamerProcessor.java | 316 ++++
.../datastream/IgniteDataStreamerUpdateJob.java | 119 ++
.../internal/processors/datastream/package.html | 24 +
.../processors/igfs/IgfsDataManager.java | 2 +-
.../IgniteDataStreamerImplSelfTest.java | 214 ---
.../IgniteDataStreamerPerformanceTest.java | 199 ---
.../IgniteDataStreamerProcessorSelfTest.java | 924 -----------
.../IgniteDataStreamerImplSelfTest.java | 214 +++
.../IgniteDataStreamerPerformanceTest.java | 199 +++
.../IgniteDataStreamerProcessorSelfTest.java | 924 +++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +-
30 files changed, 4147 insertions(+), 4147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 616eac7..48752cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.closure.*;
import org.apache.ignite.internal.processors.cluster.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a38ca92..edc5e00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.closure.*;
import org.apache.ignite.internal.processors.cluster.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 60df162..a13da36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -41,7 +41,7 @@ import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.closure.*;
import org.apache.ignite.internal.processors.cluster.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.job.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6109d74..2079233 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.processors.rest.handlers.task.*;
import org.apache.ignite.internal.processors.streamer.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 83118c4..dd9efd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.dr.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.transactions.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 16419f9..f745b5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
deleted file mode 100644
index b3828ed..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridDataLoadRequest implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long reqId;
-
- /** */
- private byte[] resTopicBytes;
-
- /** Cache name. */
- private String cacheName;
-
- /** */
- private byte[] updaterBytes;
-
- /** Entries to put. */
- private byte[] colBytes;
-
- /** {@code True} to ignore deployment ownership. */
- private boolean ignoreDepOwnership;
-
- /** */
- private boolean skipStore;
-
- /** */
- private DeploymentMode depMode;
-
- /** */
- private String sampleClsName;
-
- /** */
- private String userVer;
-
- /** Node class loader participants. */
- @GridToStringInclude
- @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
- private Map<UUID, IgniteUuid> ldrParticipants;
-
- /** */
- private IgniteUuid clsLdrId;
-
- /** */
- private boolean forceLocDep;
-
- /**
- * {@code Externalizable} support.
- */
- public GridDataLoadRequest() {
- // No-op.
- }
-
- /**
- * @param reqId Request ID.
- * @param resTopicBytes Response topic.
- * @param cacheName Cache name.
- * @param updaterBytes Cache updater.
- * @param colBytes Collection bytes.
- * @param ignoreDepOwnership Ignore ownership.
- * @param skipStore Skip store flag.
- * @param depMode Deployment mode.
- * @param sampleClsName Sample class name.
- * @param userVer User version.
- * @param ldrParticipants Loader participants.
- * @param clsLdrId Class loader ID.
- * @param forceLocDep Force local deployment.
- */
- public GridDataLoadRequest(long reqId,
- byte[] resTopicBytes,
- @Nullable String cacheName,
- byte[] updaterBytes,
- byte[] colBytes,
- boolean ignoreDepOwnership,
- boolean skipStore,
- DeploymentMode depMode,
- String sampleClsName,
- String userVer,
- Map<UUID, IgniteUuid> ldrParticipants,
- IgniteUuid clsLdrId,
- boolean forceLocDep) {
- this.reqId = reqId;
- this.resTopicBytes = resTopicBytes;
- this.cacheName = cacheName;
- this.updaterBytes = updaterBytes;
- this.colBytes = colBytes;
- this.ignoreDepOwnership = ignoreDepOwnership;
- this.skipStore = skipStore;
- this.depMode = depMode;
- this.sampleClsName = sampleClsName;
- this.userVer = userVer;
- this.ldrParticipants = ldrParticipants;
- this.clsLdrId = clsLdrId;
- this.forceLocDep = forceLocDep;
- }
-
- /**
- * @return Request ID.
- */
- public long requestId() {
- return reqId;
- }
-
- /**
- * @return Response topic.
- */
- public byte[] responseTopicBytes() {
- return resTopicBytes;
- }
-
- /**
- * @return Cache name.
- */
- public String cacheName() {
- return cacheName;
- }
-
- /**
- * @return Updater.
- */
- public byte[] updaterBytes() {
- return updaterBytes;
- }
-
- /**
- * @return Collection bytes.
- */
- public byte[] collectionBytes() {
- return colBytes;
- }
-
- /**
- * @return {@code True} to ignore ownership.
- */
- public boolean ignoreDeploymentOwnership() {
- return ignoreDepOwnership;
- }
-
- /**
- * @return Skip store flag.
- */
- public boolean skipStore() {
- return skipStore;
- }
-
- /**
- * @return Deployment mode.
- */
- public DeploymentMode deploymentMode() {
- return depMode;
- }
-
- /**
- * @return Sample class name.
- */
- public String sampleClassName() {
- return sampleClsName;
- }
-
- /**
- * @return User version.
- */
- public String userVersion() {
- return userVer;
- }
-
- /**
- * @return Participants.
- */
- public Map<UUID, IgniteUuid> participants() {
- return ldrParticipants;
- }
-
- /**
- * @return Class loader ID.
- */
- public IgniteUuid classLoaderId() {
- return clsLdrId;
- }
-
- /**
- * @return {@code True} to force local deployment.
- */
- public boolean forceLocalDeployment() {
- return forceLocDep;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDataLoadRequest.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeString("cacheName", cacheName))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByteArray("colBytes", colBytes))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeBoolean("forceLocDep", forceLocDep))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeLong("reqId", reqId))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeString("sampleClsName", sampleClsName))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeBoolean("skipStore", skipStore))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeByteArray("updaterBytes", updaterBytes))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeString("userVer", userVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- cacheName = reader.readString("cacheName");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- clsLdrId = reader.readIgniteUuid("clsLdrId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- colBytes = reader.readByteArray("colBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- byte depModeOrd;
-
- depModeOrd = reader.readByte("depMode");
-
- if (!reader.isLastRead())
- return false;
-
- depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
- reader.incrementState();
-
- case 4:
- forceLocDep = reader.readBoolean("forceLocDep");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- reqId = reader.readLong("reqId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- resTopicBytes = reader.readByteArray("resTopicBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- sampleClsName = reader.readString("sampleClsName");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- skipStore = reader.readBoolean("skipStore");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- updaterBytes = reader.readByteArray("updaterBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- userVer = reader.readString("userVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 62;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 13;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
deleted file mode 100644
index 835e3bd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.nio.*;
-
-/**
- *
- */
-public class GridDataLoadResponse implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long reqId;
-
- /** */
- private byte[] errBytes;
-
- /** */
- private boolean forceLocDep;
-
- /**
- * @param reqId Request ID.
- * @param errBytes Error bytes.
- * @param forceLocDep Force local deployment.
- */
- public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
- this.reqId = reqId;
- this.errBytes = errBytes;
- this.forceLocDep = forceLocDep;
- }
-
- /**
- * {@code Externalizable} support.
- */
- public GridDataLoadResponse() {
- // No-op.
- }
-
- /**
- * @return Request ID.
- */
- public long requestId() {
- return reqId;
- }
-
- /**
- * @return Error bytes.
- */
- public byte[] errorBytes() {
- return errBytes;
- }
-
- /**
- * @return {@code True} to force local deployment.
- */
- public boolean forceLocalDeployment() {
- return forceLocDep;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDataLoadResponse.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray("errBytes", errBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeBoolean("forceLocDep", forceLocDep))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong("reqId", reqId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- errBytes = reader.readByteArray("errBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- forceLocDep = reader.readBoolean("forceLocDep");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- reqId = reader.readLong("reqId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 63;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 3;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
deleted file mode 100644
index 1742041..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Bundled factory for cache updaters.
- */
-public class IgniteDataStreamerCacheUpdaters {
- /** */
- private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
-
- /** */
- private static final IgniteDataStreamer.Updater BATCHED = new Batched();
-
- /** */
- private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
-
- /**
- * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
- * is not the best.
- *
- * @return Single updater.
- */
- public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
- return INDIVIDUAL;
- }
-
- /**
- * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
- * updated concurrently. Performance is generally better than in {@link #individual()}.
- *
- * @return Batched updater.
- */
- public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
- return BATCHED;
- }
-
- /**
- * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
- * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
- *
- * @return Batched sorted updater.
- */
- public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
- return BATCHED_SORTED;
- }
-
- /**
- * Updates cache.
- *
- * @param cache Cache.
- * @param rmvCol Keys to remove.
- * @param putMap Entries to put.
- * @throws IgniteException If failed.
- */
- protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
- Map<K, V> putMap) {
- assert rmvCol != null || putMap != null;
-
- // Here we assume that there are no key duplicates, so the following calls are valid.
- if (rmvCol != null)
- ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
-
- if (putMap != null)
- cache.putAll(putMap);
- }
-
- /**
- * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
- */
- private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key != null;
-
- V val = entry.getValue();
-
- if (val == null)
- cache.remove(key);
- else
- cache.put(key, val);
- }
- }
- }
-
- /**
- * Batched updater. Updates cache using batch operations thus is dead lock prone.
- */
- private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- Map<K, V> putAll = null;
- Set<K> rmvAll = null;
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key != null;
-
- V val = entry.getValue();
-
- if (val == null) {
- if (rmvAll == null)
- rmvAll = new HashSet<>();
-
- rmvAll.add(key);
- }
- else {
- if (putAll == null)
- putAll = new HashMap<>();
-
- putAll.put(key, val);
- }
- }
-
- updateAll(cache, rmvAll, putAll);
- }
- }
-
- /**
- * Batched updater. Updates cache using batch operations thus is dead lock prone.
- */
- private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- Map<K, V> putAll = null;
- Set<K> rmvAll = null;
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key instanceof Comparable;
-
- V val = entry.getValue();
-
- if (val == null) {
- if (rmvAll == null)
- rmvAll = new TreeSet<>();
-
- rmvAll.add(key);
- }
- else {
- if (putAll == null)
- putAll = new TreeMap<>();
-
- putAll.put(key, val);
- }
- }
-
- updateAll(cache, rmvAll, putAll);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
deleted file mode 100644
index 5730655..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Data streamer future.
- */
-class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Data streamer. */
- @GridToStringExclude
- private IgniteDataStreamerImpl dataLdr;
-
- /**
- * Default constructor for {@link Externalizable} support.
- */
- public IgniteDataStreamerFuture() {
- // No-op.
- }
-
- /**
- * @param ctx Context.
- * @param dataLdr Data streamer.
- */
- IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
- super(ctx);
-
- assert dataLdr != null;
-
- this.dataLdr = dataLdr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean cancel() throws IgniteCheckedException {
- checkValid();
-
- if (onCancelled()) {
- dataLdr.closeEx(true);
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
deleted file mode 100644
index 1231e27..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
+++ /dev/null
@@ -1,1453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dr.*;
-import org.apache.ignite.internal.processors.portable.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.Map.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- * Data streamer implementation.
- */
-@SuppressWarnings("unchecked")
-public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
- /** Isolated updater. */
- private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
-
- /** Cache updater. */
- private Updater<K, V> updater = ISOLATED_UPDATER;
-
- /** */
- private byte[] updaterBytes;
-
- /** Max remap count before issuing an error. */
- private static final int DFLT_MAX_REMAP_CNT = 32;
-
- /** Log reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Cache name ({@code null} for default cache). */
- private final String cacheName;
-
- /** Portable enabled flag. */
- private final boolean portableEnabled;
-
- /**
- * If {@code true} then data will be transferred in compact format (only keys and values).
- * Otherwise full map entry will be transferred (this is requires by DR internal logic).
- */
- private final boolean compact;
-
- /** Per-node buffer size. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
-
- /** */
- private int parallelOps = DFLT_MAX_PARALLEL_OPS;
-
- /** */
- private long autoFlushFreq;
-
- /** Mapping. */
- @GridToStringInclude
- private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Discovery listener. */
- private final GridLocalEventListener discoLsnr;
-
- /** Context. */
- private final GridKernalContext ctx;
-
- /** Communication topic for responses. */
- private final Object topic;
-
- /** */
- private byte[] topicBytes;
-
- /** {@code True} if data streamer has been cancelled. */
- private volatile boolean cancelled;
-
- /** Active futures of this data streamer. */
- @GridToStringInclude
- private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
-
- /** Closure to remove from active futures. */
- @GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- boolean rmv = activeFuts.remove(t);
-
- assert rmv;
- }
- };
-
- /** Job peer deploy aware. */
- private volatile GridPeerDeployAware jobPda;
-
- /** Deployment class. */
- private Class<?> depCls;
-
- /** Future to track loading finish. */
- private final GridFutureAdapter<?> fut;
-
- /** Public API future to track loading finish. */
- private final IgniteFuture<?> publicFut;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /** Closed flag. */
- private final AtomicBoolean closed = new AtomicBoolean();
-
- /** */
- private volatile long lastFlushTime = U.currentTimeMillis();
-
- /** */
- private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
-
- /** */
- private boolean skipStore;
-
- /** */
- private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
-
- /**
- * @param ctx Grid kernal context.
- * @param cacheName Cache name.
- * @param flushQ Flush queue.
- * @param compact If {@code true} data is transferred in compact mode (only keys and values).
- * Otherwise full map entry will be transferred (this is required by DR internal logic).
- */
- public IgniteDataStreamerImpl(
- final GridKernalContext ctx,
- @Nullable final String cacheName,
- DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
- boolean compact
- ) {
- assert ctx != null;
-
- this.ctx = ctx;
- this.cacheName = cacheName;
- this.flushQ = flushQ;
- this.compact = compact;
-
- log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
-
- ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
- if (node == null)
- throw new IllegalStateException("Cache doesn't exist: " + cacheName);
-
- portableEnabled = ctx.portable().portableEnabled(node, cacheName);
-
- discoLsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- UUID id = discoEvt.eventNode().id();
-
- // Remap regular mappings.
- final Buffer buf = bufMappings.remove(id);
-
- if (buf != null) {
- // Only async notification is possible since
- // discovery thread may be trapped otherwise.
- ctx.closure().callLocalSafe(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- buf.onNodeLeft();
-
- return null;
- }
- },
- true /* system pool */
- );
- }
- }
- };
-
- ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
-
- // Generate unique topic for this loader.
- topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
-
- ctx.io().addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof GridDataLoadResponse;
-
- GridDataLoadResponse res = (GridDataLoadResponse)msg;
-
- if (log.isDebugEnabled())
- log.debug("Received data load response: " + res);
-
- Buffer buf = bufMappings.get(nodeId);
-
- if (buf != null)
- buf.onResponse(res);
-
- else if (log.isDebugEnabled())
- log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
- }
- });
-
- if (log.isDebugEnabled())
- log.debug("Added response listener within topic: " + topic);
-
- fut = new IgniteDataStreamerFuture(ctx, this);
-
- publicFut = new IgniteFutureImpl<>(fut);
- }
-
- /**
- * Enters busy lock.
- */
- private void enterBusy() {
- if (!busyLock.enterBusy())
- throw new IllegalStateException("Data streamer has been closed.");
- }
-
- /**
- * Leaves busy lock.
- */
- private void leaveBusy() {
- busyLock.leaveBusy();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> future() {
- return publicFut;
- }
-
- /**
- * @return Internal future.
- */
- public IgniteInternalFuture<?> internalFuture() {
- return fut;
- }
-
- /** {@inheritDoc} */
- @Override public void deployClass(Class<?> depCls) {
- this.depCls = depCls;
- }
-
- /** {@inheritDoc} */
- @Override public void updater(Updater<K, V> updater) {
- A.notNull(updater, "updater");
-
- this.updater = updater;
- }
-
- /** {@inheritDoc} */
- @Override public boolean allowOverwrite() {
- return updater != ISOLATED_UPDATER;
- }
-
- /** {@inheritDoc} */
- @Override public void allowOverwrite(boolean allow) {
- if (allow == allowOverwrite())
- return;
-
- ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
- if (node == null)
- throw new IgniteException("Failed to get node for cache: " + cacheName);
-
- updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return skipStore;
- }
-
- /** {@inheritDoc} */
- @Override public void skipStore(boolean skipStore) {
- this.skipStore = skipStore;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public String cacheName() {
- return cacheName;
- }
-
- /** {@inheritDoc} */
- @Override public int perNodeBufferSize() {
- return bufSize;
- }
-
- /** {@inheritDoc} */
- @Override public void perNodeBufferSize(int bufSize) {
- A.ensure(bufSize > 0, "bufSize > 0");
-
- this.bufSize = bufSize;
- }
-
- /** {@inheritDoc} */
- @Override public int perNodeParallelLoadOperations() {
- return parallelOps;
- }
-
- /** {@inheritDoc} */
- @Override public void perNodeParallelLoadOperations(int parallelOps) {
- this.parallelOps = parallelOps;
- }
-
- /** {@inheritDoc} */
- @Override public long autoFlushFrequency() {
- return autoFlushFreq;
- }
-
- /** {@inheritDoc} */
- @Override public void autoFlushFrequency(long autoFlushFreq) {
- A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
-
- long old = this.autoFlushFreq;
-
- if (autoFlushFreq != old) {
- this.autoFlushFreq = autoFlushFreq;
-
- if (autoFlushFreq != 0 && old == 0)
- flushQ.add(this);
- else if (autoFlushFreq == 0)
- flushQ.remove(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
- A.notNull(entries, "entries");
-
- return addData(entries.entrySet());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
- A.notEmpty(entries, "entries");
-
- enterBusy();
-
- try {
- GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
-
- resFut.listenAsync(rmvActiveFut);
-
- activeFuts.add(resFut);
-
- Collection<K> keys = null;
-
- if (entries.size() > 1) {
- keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
-
- for (Map.Entry<K, V> entry : entries)
- keys.add(entry.getKey());
- }
-
- load0(entries, resFut, keys, 0);
-
- return new IgniteFutureImpl<>(resFut);
- }
- catch (IgniteException e) {
- return new IgniteFinishedFutureImpl<>(ctx, e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
- A.notNull(entry, "entry");
-
- return addData(F.asList(entry));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(K key, V val) {
- A.notNull(key, "key");
-
- return addData(new Entry0<>(key, val));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> removeData(K key) {
- return addData(key, null);
- }
-
- /**
- * @param entries Entries.
- * @param resFut Result future.
- * @param activeKeys Active keys.
- * @param remaps Remaps count.
- */
- private void load0(
- Collection<? extends Map.Entry<K, V>> entries,
- final GridFutureAdapter<Object> resFut,
- @Nullable final Collection<K> activeKeys,
- final int remaps
- ) {
- assert entries != null;
-
- Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
-
- boolean initPda = ctx.deploy().enabled() && jobPda == null;
-
- for (Map.Entry<K, V> entry : entries) {
- List<ClusterNode> nodes;
-
- try {
- K key = entry.getKey();
-
- assert key != null;
-
- if (initPda) {
- jobPda = new DataStreamerPda(key, entry.getValue(), updater);
-
- initPda = false;
- }
-
- nodes = nodes(key);
- }
- catch (IgniteCheckedException e) {
- resFut.onDone(e);
-
- return;
- }
-
- if (F.isEmpty(nodes)) {
- resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
- "(no nodes with cache found in topology) [infos=" + entries.size() +
- ", cacheName=" + cacheName + ']'));
-
- return;
- }
-
- for (ClusterNode node : nodes) {
- Collection<Map.Entry<K, V>> col = mappings.get(node);
-
- if (col == null)
- mappings.put(node, col = new ArrayList<>());
-
- col.add(entry);
- }
- }
-
- for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
- final UUID nodeId = e.getKey().id();
-
- Buffer buf = bufMappings.get(nodeId);
-
- if (buf == null) {
- Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
-
- if (old != null)
- buf = old;
- }
-
- final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
-
- IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- try {
- t.get();
-
- if (activeKeys != null) {
- for (Map.Entry<K, V> e : entriesForNode)
- activeKeys.remove(e.getKey());
-
- if (activeKeys.isEmpty())
- resFut.onDone();
- }
- else {
- assert entriesForNode.size() == 1;
-
- // That has been a single key,
- // so complete result future right away.
- resFut.onDone();
- }
- }
- catch (IgniteCheckedException e1) {
- if (log.isDebugEnabled())
- log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
-
- if (cancelled) {
- resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
- IgniteDataStreamerImpl.this, e1));
- }
- else if (remaps + 1 > maxRemapCnt) {
- resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
- + remaps), e1);
- }
- else
- load0(entriesForNode, resFut, activeKeys, remaps + 1);
- }
- }
- };
-
- GridFutureAdapter<?> f;
-
- try {
- f = buf.update(entriesForNode, lsnr);
- }
- catch (IgniteInterruptedCheckedException e1) {
- resFut.onDone(e1);
-
- return;
- }
-
- if (ctx.discovery().node(nodeId) == null) {
- if (bufMappings.remove(nodeId, buf))
- buf.onNodeLeft();
-
- if (f != null)
- f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + nodeId));
- }
- }
- }
-
- /**
- * @param key Key to map.
- * @return Nodes to send requests to.
- * @throws IgniteCheckedException If failed.
- */
- private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
- GridAffinityProcessor aff = ctx.affinity();
-
- return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
- Collections.singletonList(aff.mapKeyToNode(cacheName, key));
- }
-
- /**
- * Performs flush.
- *
- * @throws IgniteCheckedException If failed.
- */
- private void doFlush() throws IgniteCheckedException {
- lastFlushTime = U.currentTimeMillis();
-
- List<IgniteInternalFuture> activeFuts0 = null;
-
- int doneCnt = 0;
-
- for (IgniteInternalFuture<?> f : activeFuts) {
- if (!f.isDone()) {
- if (activeFuts0 == null)
- activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
-
- activeFuts0.add(f);
- }
- else {
- f.get();
-
- doneCnt++;
- }
- }
-
- if (activeFuts0 == null || activeFuts0.isEmpty())
- return;
-
- while (true) {
- Queue<IgniteInternalFuture<?>> q = null;
-
- for (Buffer buf : bufMappings.values()) {
- IgniteInternalFuture<?> flushFut = buf.flush();
-
- if (flushFut != null) {
- if (q == null)
- q = new ArrayDeque<>(bufMappings.size() * 2);
-
- q.add(flushFut);
- }
- }
-
- if (q != null) {
- assert !q.isEmpty();
-
- boolean err = false;
-
- for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
- try {
- fut.get();
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to flush buffer: " + e);
-
- err = true;
- }
- }
-
- if (err)
- // Remaps needed - flush buffers.
- continue;
- }
-
- doneCnt = 0;
-
- for (int i = 0; i < activeFuts0.size(); i++) {
- IgniteInternalFuture f = activeFuts0.get(i);
-
- if (f == null)
- doneCnt++;
- else if (f.isDone()) {
- f.get();
-
- doneCnt++;
-
- activeFuts0.set(i, null);
- }
- else
- break;
- }
-
- if (doneCnt == activeFuts0.size())
- return;
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void flush() throws IgniteException {
- enterBusy();
-
- try {
- doFlush();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * Flushes every internal buffer if buffer was flushed before passed in
- * threshold.
- * <p>
- * Does not wait for result and does not fail on errors assuming that this method
- * should be called periodically.
- */
- @Override public void tryFlush() throws IgniteInterruptedException {
- if (!busyLock.enterBusy())
- return;
-
- try {
- for (Buffer buf : bufMappings.values())
- buf.flush();
-
- lastFlushTime = U.currentTimeMillis();
- }
- catch (IgniteInterruptedCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * @param cancel {@code True} to close with cancellation.
- * @throws IgniteException If failed.
- */
- @Override public void close(boolean cancel) throws IgniteException {
- try {
- closeEx(cancel);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /**
- * @param cancel {@code True} to close with cancellation.
- * @throws IgniteCheckedException If failed.
- */
- public void closeEx(boolean cancel) throws IgniteCheckedException {
- if (!closed.compareAndSet(false, true))
- return;
-
- busyLock.block();
-
- if (log.isDebugEnabled())
- log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
-
- IgniteCheckedException e = null;
-
- try {
- // Assuming that no methods are called on this loader after this method is called.
- if (cancel) {
- cancelled = true;
-
- for (Buffer buf : bufMappings.values())
- buf.cancelAll();
- }
- else
- doFlush();
-
- ctx.event().removeLocalEventListener(discoLsnr);
-
- ctx.io().removeMessageListener(topic);
- }
- catch (IgniteCheckedException e0) {
- e = e0;
- }
-
- fut.onDone(null, e);
-
- if (e != null)
- throw e;
- }
-
- /**
- * @return {@code true} If the loader is closed.
- */
- boolean isClosed() {
- return fut.isDone();
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteException {
- close(false);
- }
-
- /**
- * @return Max remap count.
- */
- public int maxRemapCount() {
- return maxRemapCnt;
- }
-
- /**
- * @param maxRemapCnt New max remap count.
- */
- public void maxRemapCount(int maxRemapCnt) {
- this.maxRemapCnt = maxRemapCnt;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteDataStreamerImpl.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public long getDelay(TimeUnit unit) {
- return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
- }
-
- /**
- * @return Next flush time.
- */
- private long nextFlushTime() {
- return lastFlushTime + autoFlushFreq;
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(Delayed o) {
- return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
- }
-
- /**
- *
- */
- private class Buffer {
- /** Node. */
- private final ClusterNode node;
-
- /** Active futures. */
- private final Collection<IgniteInternalFuture<Object>> locFuts;
-
- /** Buffered entries. */
- private List<Map.Entry<K, V>> entries;
-
- /** */
- @GridToStringExclude
- private GridFutureAdapter<Object> curFut;
-
- /** Local node flag. */
- private final boolean isLocNode;
-
- /** ID generator. */
- private final AtomicLong idGen = new AtomicLong();
-
- /** Active futures. */
- private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
-
- /** */
- private final Semaphore sem;
-
- /** Closure to signal on task finish. */
- @GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- signalTaskFinished(t);
- }
- };
-
- /**
- * @param node Node.
- */
- Buffer(ClusterNode node) {
- assert node != null;
-
- this.node = node;
-
- locFuts = new GridConcurrentHashSet<>();
- reqs = new ConcurrentHashMap8<>();
-
- // Cache local node flag.
- isLocNode = node.equals(ctx.discovery().localNode());
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
-
- sem = new Semaphore(parallelOps);
- }
-
- /**
- * @param newEntries Infos.
- * @param lsnr Listener for the operation future.
- * @throws IgniteInterruptedCheckedException If failed.
- * @return Future for operation.
- */
- @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
- IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
- List<Map.Entry<K, V>> entries0 = null;
- GridFutureAdapter<Object> curFut0;
-
- synchronized (this) {
- curFut0 = curFut;
-
- curFut0.listenAsync(lsnr);
-
- for (Map.Entry<K, V> entry : newEntries)
- entries.add(entry);
-
- if (entries.size() >= bufSize) {
- entries0 = entries;
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
- }
- }
-
- if (entries0 != null) {
- submit(entries0, curFut0);
-
- if (cancelled)
- curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this));
- }
-
- return curFut0;
- }
-
- /**
- * @return Fresh collection with some space for outgrowth.
- */
- private List<Map.Entry<K, V>> newEntries() {
- return new ArrayList<>((int)(bufSize * 1.2));
- }
-
- /**
- * @return Future if any submitted.
- *
- * @throws IgniteInterruptedCheckedException If thread has been interrupted.
- */
- @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
- List<Map.Entry<K, V>> entries0 = null;
- GridFutureAdapter<Object> curFut0 = null;
-
- synchronized (this) {
- if (!entries.isEmpty()) {
- entries0 = entries;
- curFut0 = curFut;
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
- }
- }
-
- if (entries0 != null)
- submit(entries0, curFut0);
-
- // Create compound future for this flush.
- GridCompoundFuture<Object, Object> res = null;
-
- for (IgniteInternalFuture<Object> f : locFuts) {
- if (res == null)
- res = new GridCompoundFuture<>(ctx);
-
- res.add(f);
- }
-
- for (IgniteInternalFuture<Object> f : reqs.values()) {
- if (res == null)
- res = new GridCompoundFuture<>(ctx);
-
- res.add(f);
- }
-
- if (res != null)
- res.markInitialized();
-
- return res;
- }
-
- /**
- * Increments active tasks count.
- *
- * @throws IgniteInterruptedCheckedException If thread has been interrupted.
- */
- private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
- U.acquire(sem);
- }
-
- /**
- * @param f Future that finished.
- */
- private void signalTaskFinished(IgniteInternalFuture<Object> f) {
- assert f != null;
-
- sem.release();
- }
-
- /**
- * @param entries Entries to submit.
- * @param curFut Current future.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
- throws IgniteInterruptedCheckedException {
- assert entries != null;
- assert !entries.isEmpty();
- assert curFut != null;
-
- incrementActiveTasks();
-
- IgniteInternalFuture<Object> fut;
-
- if (isLocNode) {
- fut = ctx.closure().callLocalSafe(
- new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
-
- locFuts.add(fut);
-
- fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- try {
- boolean rmv = locFuts.remove(t);
-
- assert rmv;
-
- curFut.onDone(t.get());
- }
- catch (IgniteCheckedException e) {
- curFut.onDone(e);
- }
- }
- });
- }
- else {
- byte[] entriesBytes;
-
- try {
- if (compact) {
- entriesBytes = ctx.config().getMarshaller()
- .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
- }
- else
- entriesBytes = ctx.config().getMarshaller().marshal(entries);
-
- if (updaterBytes == null) {
- assert updater != null;
-
- updaterBytes = ctx.config().getMarshaller().marshal(updater);
- }
-
- if (topicBytes == null)
- topicBytes = ctx.config().getMarshaller().marshal(topic);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal (request will not be sent).", e);
-
- return;
- }
-
- GridDeployment dep = null;
- GridPeerDeployAware jobPda0 = null;
-
- if (ctx.deploy().enabled()) {
- try {
- jobPda0 = jobPda;
-
- assert jobPda0 != null;
-
- dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
-
- GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-
- if (cache != null)
- cache.context().deploy().onEnter();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
-
- return;
- }
-
- if (dep == null)
- U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
- }
-
- long reqId = idGen.incrementAndGet();
-
- fut = curFut;
-
- reqs.put(reqId, (GridFutureAdapter<Object>)fut);
-
- GridDataLoadRequest req = new GridDataLoadRequest(
- reqId,
- topicBytes,
- cacheName,
- updaterBytes,
- entriesBytes,
- true,
- skipStore,
- dep != null ? dep.deployMode() : null,
- dep != null ? jobPda0.deployClass().getName() : null,
- dep != null ? dep.userVersion() : null,
- dep != null ? dep.participants() : null,
- dep != null ? dep.classLoaderId() : null,
- dep == null);
-
- try {
- ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
-
- if (log.isDebugEnabled())
- log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
- }
- catch (IgniteCheckedException e) {
- if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
- ((GridFutureAdapter<Object>)fut).onDone(e);
- else
- ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
- "request (node has left): " + node.id()));
- }
- }
- }
-
- /**
- *
- */
- void onNodeLeft() {
- assert !isLocNode;
- assert bufMappings.get(node.id()) != this;
-
- if (log.isDebugEnabled())
- log.debug("Forcibly completing futures (node has left): " + node.id());
-
- Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + node.id());
-
- for (GridFutureAdapter<Object> f : reqs.values())
- f.onDone(e);
-
- // Make sure to complete current future.
- GridFutureAdapter<Object> curFut0;
-
- synchronized (this) {
- curFut0 = curFut;
- }
-
- curFut0.onDone(e);
- }
-
- /**
- * @param res Response.
- */
- void onResponse(GridDataLoadResponse res) {
- if (log.isDebugEnabled())
- log.debug("Received data load response: " + res);
-
- GridFutureAdapter<?> f = reqs.remove(res.requestId());
-
- if (f == null) {
- if (log.isDebugEnabled())
- log.debug("Future for request has not been found: " + res.requestId());
-
- return;
- }
-
- Throwable err = null;
-
- byte[] errBytes = res.errorBytes();
-
- if (errBytes != null) {
- try {
- GridPeerDeployAware jobPda0 = jobPda;
-
- err = ctx.config().getMarshaller().unmarshal(
- errBytes,
- jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
- }
- catch (IgniteCheckedException e) {
- f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
-
- return;
- }
- }
-
- f.onDone(null, err);
-
- if (log.isDebugEnabled())
- log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
- }
-
- /**
- *
- */
- void cancelAll() {
- IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this);
-
- for (IgniteInternalFuture<?> f : locFuts) {
- try {
- f.cancel();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to cancel mini-future.", e);
- }
- }
-
- for (GridFutureAdapter<?> f : reqs.values())
- f.onDone(err);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- int size;
-
- synchronized (this) {
- size = entries.size();
- }
-
- return S.toString(Buffer.class, this,
- "entriesCnt", size,
- "locFutsSize", locFuts.size(),
- "reqsSize", reqs.size());
- }
- }
-
- /**
- * Data streamer peer-deploy aware.
- */
- private class DataStreamerPda implements GridPeerDeployAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Deploy class. */
- private Class<?> cls;
-
- /** Class loader. */
- private ClassLoader ldr;
-
- /** Collection of objects to detect deploy class and class loader. */
- private Collection<Object> objs;
-
- /**
- * Constructs data streamer peer-deploy aware.
- *
- * @param objs Collection of objects to detect deploy class and class loader.
- */
- private DataStreamerPda(Object... objs) {
- this.objs = Arrays.asList(objs);
- }
-
- /** {@inheritDoc} */
- @Override public Class<?> deployClass() {
- if (cls == null) {
- Class<?> cls0 = null;
-
- if (depCls != null)
- cls0 = depCls;
- else {
- for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
- Object o = it.next();
-
- if (o != null)
- cls0 = U.detectClass(o);
- }
-
- if (cls0 == null || U.isJdk(cls0))
- cls0 = IgniteDataStreamerImpl.class;
- }
-
- assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
-
- cls = cls0;
- }
-
- return cls;
- }
-
- /** {@inheritDoc} */
- @Override public ClassLoader classLoader() {
- if (ldr == null) {
- ClassLoader ldr0 = deployClass().getClassLoader();
-
- // Safety.
- if (ldr0 == null)
- ldr0 = U.gridClassLoader();
-
- assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
-
- ldr = ldr0;
- }
-
- return ldr;
- }
- }
-
- /**
- * Entry.
- */
- private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private K key;
-
- /** */
- private V val;
-
- /**
- * @param key Key.
- * @param val Value.
- */
- private Entry0(K key, @Nullable V val) {
- assert key != null;
-
- this.key = key;
- this.val = val;
- }
-
- /**
- * For {@link Externalizable}.
- */
- @SuppressWarnings("UnusedDeclaration")
- public Entry0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public K getKey() {
- return key;
- }
-
- /** {@inheritDoc} */
- @Override public V getValue() {
- return val;
- }
-
- /** {@inheritDoc} */
- @Override public V setValue(V val) {
- V old = this.val;
-
- this.val = val;
-
- return old;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(key);
- out.writeObject(val);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- key = (K)in.readObject();
- val = (V)in.readObject();
- }
- }
-
- /**
- * Wrapper list with special compact serialization of map entries.
- */
- private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Wrapped delegate. */
- private Collection<Map.Entry<K, V>> delegate;
-
- /** Optional portable processor for converting values. */
- private GridPortableProcessor portable;
-
- /**
- * @param delegate Delegate.
- * @param portable Portable processor.
- */
- private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
- this.delegate = delegate;
- this.portable = portable;
- }
-
- /**
- * For {@link Externalizable}.
- */
- public Entries0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Entry<K, V>> iterator() {
- return delegate.iterator();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return delegate.size();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(delegate.size());
-
- boolean portableEnabled = portable != null;
-
- for (Map.Entry<K, V> entry : delegate) {
- if (portableEnabled) {
- out.writeObject(portable.marshalToPortable(entry.getKey()));
- out.writeObject(portable.marshalToPortable(entry.getValue()));
- }
- else {
- out.writeObject(entry.getKey());
- out.writeObject(entry.getValue());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int sz = in.readInt();
-
- delegate = new ArrayList<>(sz);
-
- for (int i = 0; i < sz; i++) {
- Object k = in.readObject();
- Object v = in.readObject();
-
- delegate.add(new Entry0<>((K)k, (V)v));
- }
- }
- }
-
- /**
- * Isolated updater which only loads entry initial value.
- */
- private static class IsolatedUpdater<K, V> implements Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
-
- GridCacheAdapter<K, V> internalCache = proxy.context().cache();
-
- if (internalCache.isNear())
- internalCache = internalCache.context().near().dht();
-
- GridCacheContext<K, V> cctx = internalCache.context();
-
- long topVer = cctx.affinity().affinityTopologyVersion();
-
- GridCacheVersion ver = cctx.versions().next(topVer);
-
- boolean portable = cctx.portableEnabled();
-
- for (Map.Entry<K, V> e : entries) {
- try {
- K key = e.getKey();
- V val = e.getValue();
-
- if (portable) {
- key = (K)cctx.marshalToPortable(key);
- val = (V)cctx.marshalToPortable(val);
- }
-
- GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
-
- entry.unswap(true, false);
-
- entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
- GridDrType.DR_LOAD);
-
- cctx.evicts().touch(entry, topVer);
- }
- catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
- // No-op.
- }
- catch (IgniteCheckedException ex) {
- IgniteLogger log = cache.unwrap(Ignite.class).log();
-
- U.error(log, "Failed to set initial value for cache entry: " + e, ex);
- }
- }
- }
- }
-}
[3/5] incubator-ignite git commit: # gg-9869
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
new file mode 100644
index 0000000..9ac3ec8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerImpl.java
@@ -0,0 +1,1453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.processors.dr.*;
+import org.apache.ignite.internal.processors.portable.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.Map.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ * Data streamer implementation.
+ */
+@SuppressWarnings("unchecked")
+public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
+ /** Isolated updater. */
+ private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
+ /** Cache updater. */
+ private Updater<K, V> updater = ISOLATED_UPDATER;
+
+ /** */
+ private byte[] updaterBytes;
+
+ /** Max remap count before issuing an error. */
+ private static final int DFLT_MAX_REMAP_CNT = 32;
+
+ /** Log reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Cache name ({@code null} for default cache). */
+ private final String cacheName;
+
+ /** Portable enabled flag. */
+ private final boolean portableEnabled;
+
+ /**
+ * If {@code true} then data will be transferred in compact format (only keys and values).
+ * Otherwise full map entry will be transferred (this is requires by DR internal logic).
+ */
+ private final boolean compact;
+
+ /** Per-node buffer size. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+ /** */
+ private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+ /** */
+ private long autoFlushFreq;
+
+ /** Mapping. */
+ @GridToStringInclude
+ private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Discovery listener. */
+ private final GridLocalEventListener discoLsnr;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /** Communication topic for responses. */
+ private final Object topic;
+
+ /** */
+ private byte[] topicBytes;
+
+ /** {@code True} if data streamer has been cancelled. */
+ private volatile boolean cancelled;
+
+ /** Active futures of this data streamer. */
+ @GridToStringInclude
+ private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
+
+ /** Closure to remove from active futures. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ boolean rmv = activeFuts.remove(t);
+
+ assert rmv;
+ }
+ };
+
+ /** Job peer deploy aware. */
+ private volatile GridPeerDeployAware jobPda;
+
+ /** Deployment class. */
+ private Class<?> depCls;
+
+ /** Future to track loading finish. */
+ private final GridFutureAdapter<?> fut;
+
+ /** Public API future to track loading finish. */
+ private final IgniteFuture<?> publicFut;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /** */
+ private volatile long lastFlushTime = U.currentTimeMillis();
+
+ /** */
+ private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
+
+ /** */
+ private boolean skipStore;
+
+ /** */
+ private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+
+ /**
+ * @param ctx Grid kernal context.
+ * @param cacheName Cache name.
+ * @param flushQ Flush queue.
+ * @param compact If {@code true} data is transferred in compact mode (only keys and values).
+ * Otherwise full map entry will be transferred (this is required by DR internal logic).
+ */
+ public IgniteDataStreamerImpl(
+ final GridKernalContext ctx,
+ @Nullable final String cacheName,
+ DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
+ boolean compact
+ ) {
+ assert ctx != null;
+
+ this.ctx = ctx;
+ this.cacheName = cacheName;
+ this.flushQ = flushQ;
+ this.compact = compact;
+
+ log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
+
+ ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IllegalStateException("Cache doesn't exist: " + cacheName);
+
+ portableEnabled = ctx.portable().portableEnabled(node, cacheName);
+
+ discoLsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID id = discoEvt.eventNode().id();
+
+ // Remap regular mappings.
+ final Buffer buf = bufMappings.remove(id);
+
+ if (buf != null) {
+ // Only async notification is possible since
+ // discovery thread may be trapped otherwise.
+ ctx.closure().callLocalSafe(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ buf.onNodeLeft();
+
+ return null;
+ }
+ },
+ true /* system pool */
+ );
+ }
+ }
+ };
+
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
+
+ // Generate unique topic for this loader.
+ topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+ ctx.io().addMessageListener(topic, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof GridDataLoadResponse;
+
+ GridDataLoadResponse res = (GridDataLoadResponse)msg;
+
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf != null)
+ buf.onResponse(res);
+
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
+ }
+ });
+
+ if (log.isDebugEnabled())
+ log.debug("Added response listener within topic: " + topic);
+
+ fut = new IgniteDataStreamerFuture(ctx, this);
+
+ publicFut = new IgniteFutureImpl<>(fut);
+ }
+
+ /**
+ * Enters busy lock.
+ */
+ private void enterBusy() {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Data streamer has been closed.");
+ }
+
+ /**
+ * Leaves busy lock.
+ */
+ private void leaveBusy() {
+ busyLock.leaveBusy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> future() {
+ return publicFut;
+ }
+
+ /**
+ * @return Internal future.
+ */
+ public IgniteInternalFuture<?> internalFuture() {
+ return fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deployClass(Class<?> depCls) {
+ this.depCls = depCls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updater(Updater<K, V> updater) {
+ A.notNull(updater, "updater");
+
+ this.updater = updater;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowOverwrite() {
+ return updater != ISOLATED_UPDATER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void allowOverwrite(boolean allow) {
+ if (allow == allowOverwrite())
+ return;
+
+ ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IgniteException("Failed to get node for cache: " + cacheName);
+
+ updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void skipStore(boolean skipStore) {
+ this.skipStore = skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public String cacheName() {
+ return cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeBufferSize() {
+ return bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeBufferSize(int bufSize) {
+ A.ensure(bufSize > 0, "bufSize > 0");
+
+ this.bufSize = bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeParallelLoadOperations() {
+ return parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeParallelLoadOperations(int parallelOps) {
+ this.parallelOps = parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long autoFlushFrequency() {
+ return autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void autoFlushFrequency(long autoFlushFreq) {
+ A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+ long old = this.autoFlushFreq;
+
+ if (autoFlushFreq != old) {
+ this.autoFlushFreq = autoFlushFreq;
+
+ if (autoFlushFreq != 0 && old == 0)
+ flushQ.add(this);
+ else if (autoFlushFreq == 0)
+ flushQ.remove(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+ A.notNull(entries, "entries");
+
+ return addData(entries.entrySet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
+ A.notEmpty(entries, "entries");
+
+ enterBusy();
+
+ try {
+ GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
+
+ resFut.listenAsync(rmvActiveFut);
+
+ activeFuts.add(resFut);
+
+ Collection<K> keys = null;
+
+ if (entries.size() > 1) {
+ keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
+
+ for (Map.Entry<K, V> entry : entries)
+ keys.add(entry.getKey());
+ }
+
+ load0(entries, resFut, keys, 0);
+
+ return new IgniteFutureImpl<>(resFut);
+ }
+ catch (IgniteException e) {
+ return new IgniteFinishedFutureImpl<>(ctx, e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+ A.notNull(entry, "entry");
+
+ return addData(F.asList(entry));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(K key, V val) {
+ A.notNull(key, "key");
+
+ return addData(new Entry0<>(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> removeData(K key) {
+ return addData(key, null);
+ }
+
+ /**
+ * @param entries Entries.
+ * @param resFut Result future.
+ * @param activeKeys Active keys.
+ * @param remaps Remaps count.
+ */
+ private void load0(
+ Collection<? extends Map.Entry<K, V>> entries,
+ final GridFutureAdapter<Object> resFut,
+ @Nullable final Collection<K> activeKeys,
+ final int remaps
+ ) {
+ assert entries != null;
+
+ Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
+
+ boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ List<ClusterNode> nodes;
+
+ try {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ if (initPda) {
+ jobPda = new DataStreamerPda(key, entry.getValue(), updater);
+
+ initPda = false;
+ }
+
+ nodes = nodes(key);
+ }
+ catch (IgniteCheckedException e) {
+ resFut.onDone(e);
+
+ return;
+ }
+
+ if (F.isEmpty(nodes)) {
+ resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
+ "(no nodes with cache found in topology) [infos=" + entries.size() +
+ ", cacheName=" + cacheName + ']'));
+
+ return;
+ }
+
+ for (ClusterNode node : nodes) {
+ Collection<Map.Entry<K, V>> col = mappings.get(node);
+
+ if (col == null)
+ mappings.put(node, col = new ArrayList<>());
+
+ col.add(entry);
+ }
+ }
+
+ for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
+ final UUID nodeId = e.getKey().id();
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf == null) {
+ Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
+
+ if (old != null)
+ buf = old;
+ }
+
+ final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
+
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ try {
+ t.get();
+
+ if (activeKeys != null) {
+ for (Map.Entry<K, V> e : entriesForNode)
+ activeKeys.remove(e.getKey());
+
+ if (activeKeys.isEmpty())
+ resFut.onDone();
+ }
+ else {
+ assert entriesForNode.size() == 1;
+
+ // That has been a single key,
+ // so complete result future right away.
+ resFut.onDone();
+ }
+ }
+ catch (IgniteCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+ if (cancelled) {
+ resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+ IgniteDataStreamerImpl.this, e1));
+ }
+ else if (remaps + 1 > maxRemapCnt) {
+ resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
+ + remaps), e1);
+ }
+ else
+ load0(entriesForNode, resFut, activeKeys, remaps + 1);
+ }
+ }
+ };
+
+ GridFutureAdapter<?> f;
+
+ try {
+ f = buf.update(entriesForNode, lsnr);
+ }
+ catch (IgniteInterruptedCheckedException e1) {
+ resFut.onDone(e1);
+
+ return;
+ }
+
+ if (ctx.discovery().node(nodeId) == null) {
+ if (bufMappings.remove(nodeId, buf))
+ buf.onNodeLeft();
+
+ if (f != null)
+ f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
+ "(node has left): " + nodeId));
+ }
+ }
+ }
+
+ /**
+ * @param key Key to map.
+ * @return Nodes to send requests to.
+ * @throws IgniteCheckedException If failed.
+ */
+ private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
+ GridAffinityProcessor aff = ctx.affinity();
+
+ return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
+ Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+ }
+
+ /**
+ * Performs flush.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doFlush() throws IgniteCheckedException {
+ lastFlushTime = U.currentTimeMillis();
+
+ List<IgniteInternalFuture> activeFuts0 = null;
+
+ int doneCnt = 0;
+
+ for (IgniteInternalFuture<?> f : activeFuts) {
+ if (!f.isDone()) {
+ if (activeFuts0 == null)
+ activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
+
+ activeFuts0.add(f);
+ }
+ else {
+ f.get();
+
+ doneCnt++;
+ }
+ }
+
+ if (activeFuts0 == null || activeFuts0.isEmpty())
+ return;
+
+ while (true) {
+ Queue<IgniteInternalFuture<?>> q = null;
+
+ for (Buffer buf : bufMappings.values()) {
+ IgniteInternalFuture<?> flushFut = buf.flush();
+
+ if (flushFut != null) {
+ if (q == null)
+ q = new ArrayDeque<>(bufMappings.size() * 2);
+
+ q.add(flushFut);
+ }
+ }
+
+ if (q != null) {
+ assert !q.isEmpty();
+
+ boolean err = false;
+
+ for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to flush buffer: " + e);
+
+ err = true;
+ }
+ }
+
+ if (err)
+ // Remaps needed - flush buffers.
+ continue;
+ }
+
+ doneCnt = 0;
+
+ for (int i = 0; i < activeFuts0.size(); i++) {
+ IgniteInternalFuture f = activeFuts0.get(i);
+
+ if (f == null)
+ doneCnt++;
+ else if (f.isDone()) {
+ f.get();
+
+ doneCnt++;
+
+ activeFuts0.set(i, null);
+ }
+ else
+ break;
+ }
+
+ if (doneCnt == activeFuts0.size())
+ return;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void flush() throws IgniteException {
+ enterBusy();
+
+ try {
+ doFlush();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * Flushes every internal buffer if buffer was flushed before passed in
+ * threshold.
+ * <p>
+ * Does not wait for result and does not fail on errors assuming that this method
+ * should be called periodically.
+ */
+ @Override public void tryFlush() throws IgniteInterruptedException {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ for (Buffer buf : bufMappings.values())
+ buf.flush();
+
+ lastFlushTime = U.currentTimeMillis();
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteException If failed.
+ */
+ @Override public void close(boolean cancel) throws IgniteException {
+ try {
+ closeEx(cancel);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void closeEx(boolean cancel) throws IgniteCheckedException {
+ if (!closed.compareAndSet(false, true))
+ return;
+
+ busyLock.block();
+
+ if (log.isDebugEnabled())
+ log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
+
+ IgniteCheckedException e = null;
+
+ try {
+ // Assuming that no methods are called on this loader after this method is called.
+ if (cancel) {
+ cancelled = true;
+
+ for (Buffer buf : bufMappings.values())
+ buf.cancelAll();
+ }
+ else
+ doFlush();
+
+ ctx.event().removeLocalEventListener(discoLsnr);
+
+ ctx.io().removeMessageListener(topic);
+ }
+ catch (IgniteCheckedException e0) {
+ e = e0;
+ }
+
+ fut.onDone(null, e);
+
+ if (e != null)
+ throw e;
+ }
+
+ /**
+ * @return {@code true} If the loader is closed.
+ */
+ boolean isClosed() {
+ return fut.isDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ close(false);
+ }
+
+ /**
+ * @return Max remap count.
+ */
+ public int maxRemapCount() {
+ return maxRemapCnt;
+ }
+
+ /**
+ * @param maxRemapCnt New max remap count.
+ */
+ public void maxRemapCount(int maxRemapCnt) {
+ this.maxRemapCnt = maxRemapCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteDataStreamerImpl.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDelay(TimeUnit unit) {
+ return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return Next flush time.
+ */
+ private long nextFlushTime() {
+ return lastFlushTime + autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Delayed o) {
+ return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
+ }
+
+ /**
+ *
+ */
+ private class Buffer {
+ /** Node. */
+ private final ClusterNode node;
+
+ /** Active futures. */
+ private final Collection<IgniteInternalFuture<Object>> locFuts;
+
+ /** Buffered entries. */
+ private List<Map.Entry<K, V>> entries;
+
+ /** */
+ @GridToStringExclude
+ private GridFutureAdapter<Object> curFut;
+
+ /** Local node flag. */
+ private final boolean isLocNode;
+
+ /** ID generator. */
+ private final AtomicLong idGen = new AtomicLong();
+
+ /** Active futures. */
+ private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+ /** */
+ private final Semaphore sem;
+
+ /** Closure to signal on task finish. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ signalTaskFinished(t);
+ }
+ };
+
+ /**
+ * @param node Node.
+ */
+ Buffer(ClusterNode node) {
+ assert node != null;
+
+ this.node = node;
+
+ locFuts = new GridConcurrentHashSet<>();
+ reqs = new ConcurrentHashMap8<>();
+
+ // Cache local node flag.
+ isLocNode = node.equals(ctx.discovery().localNode());
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>(ctx);
+ curFut.listenAsync(signalC);
+
+ sem = new Semaphore(parallelOps);
+ }
+
+ /**
+ * @param newEntries Infos.
+ * @param lsnr Listener for the operation future.
+ * @throws IgniteInterruptedCheckedException If failed.
+ * @return Future for operation.
+ */
+ @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
+ List<Map.Entry<K, V>> entries0 = null;
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+
+ curFut0.listenAsync(lsnr);
+
+ for (Map.Entry<K, V> entry : newEntries)
+ entries.add(entry);
+
+ if (entries.size() >= bufSize) {
+ entries0 = entries;
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>(ctx);
+ curFut.listenAsync(signalC);
+ }
+ }
+
+ if (entries0 != null) {
+ submit(entries0, curFut0);
+
+ if (cancelled)
+ curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this));
+ }
+
+ return curFut0;
+ }
+
+ /**
+ * @return Fresh collection with some space for outgrowth.
+ */
+ private List<Map.Entry<K, V>> newEntries() {
+ return new ArrayList<>((int)(bufSize * 1.2));
+ }
+
+ /**
+ * @return Future if any submitted.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+ */
+ @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
+ List<Map.Entry<K, V>> entries0 = null;
+ GridFutureAdapter<Object> curFut0 = null;
+
+ synchronized (this) {
+ if (!entries.isEmpty()) {
+ entries0 = entries;
+ curFut0 = curFut;
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>(ctx);
+ curFut.listenAsync(signalC);
+ }
+ }
+
+ if (entries0 != null)
+ submit(entries0, curFut0);
+
+ // Create compound future for this flush.
+ GridCompoundFuture<Object, Object> res = null;
+
+ for (IgniteInternalFuture<Object> f : locFuts) {
+ if (res == null)
+ res = new GridCompoundFuture<>(ctx);
+
+ res.add(f);
+ }
+
+ for (IgniteInternalFuture<Object> f : reqs.values()) {
+ if (res == null)
+ res = new GridCompoundFuture<>(ctx);
+
+ res.add(f);
+ }
+
+ if (res != null)
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
+ * Increments active tasks count.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been interrupted.
+ */
+ private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
+ U.acquire(sem);
+ }
+
+ /**
+ * @param f Future that finished.
+ */
+ private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+ assert f != null;
+
+ sem.release();
+ }
+
+ /**
+ * @param entries Entries to submit.
+ * @param curFut Current future.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
+ throws IgniteInterruptedCheckedException {
+ assert entries != null;
+ assert !entries.isEmpty();
+ assert curFut != null;
+
+ incrementActiveTasks();
+
+ IgniteInternalFuture<Object> fut;
+
+ if (isLocNode) {
+ fut = ctx.closure().callLocalSafe(
+ new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
+
+ locFuts.add(fut);
+
+ fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ try {
+ boolean rmv = locFuts.remove(t);
+
+ assert rmv;
+
+ curFut.onDone(t.get());
+ }
+ catch (IgniteCheckedException e) {
+ curFut.onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ byte[] entriesBytes;
+
+ try {
+ if (compact) {
+ entriesBytes = ctx.config().getMarshaller()
+ .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
+ }
+ else
+ entriesBytes = ctx.config().getMarshaller().marshal(entries);
+
+ if (updaterBytes == null) {
+ assert updater != null;
+
+ updaterBytes = ctx.config().getMarshaller().marshal(updater);
+ }
+
+ if (topicBytes == null)
+ topicBytes = ctx.config().getMarshaller().marshal(topic);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal (request will not be sent).", e);
+
+ return;
+ }
+
+ GridDeployment dep = null;
+ GridPeerDeployAware jobPda0 = null;
+
+ if (ctx.deploy().enabled()) {
+ try {
+ jobPda0 = jobPda;
+
+ assert jobPda0 != null;
+
+ dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
+
+ GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+
+ if (cache != null)
+ cache.context().deploy().onEnter();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
+
+ return;
+ }
+
+ if (dep == null)
+ U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
+ }
+
+ long reqId = idGen.incrementAndGet();
+
+ fut = curFut;
+
+ reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+ GridDataLoadRequest req = new GridDataLoadRequest(
+ reqId,
+ topicBytes,
+ cacheName,
+ updaterBytes,
+ entriesBytes,
+ true,
+ skipStore,
+ dep != null ? dep.deployMode() : null,
+ dep != null ? jobPda0.deployClass().getName() : null,
+ dep != null ? dep.userVersion() : null,
+ dep != null ? dep.participants() : null,
+ dep != null ? dep.classLoaderId() : null,
+ dep == null);
+
+ try {
+ ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+ ((GridFutureAdapter<Object>)fut).onDone(e);
+ else
+ ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
+ "request (node has left): " + node.id()));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ void onNodeLeft() {
+ assert !isLocNode;
+ assert bufMappings.get(node.id()) != this;
+
+ if (log.isDebugEnabled())
+ log.debug("Forcibly completing futures (node has left): " + node.id());
+
+ Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
+ "(node has left): " + node.id());
+
+ for (GridFutureAdapter<Object> f : reqs.values())
+ f.onDone(e);
+
+ // Make sure to complete current future.
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+ }
+
+ curFut0.onDone(e);
+ }
+
+ /**
+ * @param res Response.
+ */
+ void onResponse(GridDataLoadResponse res) {
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+ if (f == null) {
+ if (log.isDebugEnabled())
+ log.debug("Future for request has not been found: " + res.requestId());
+
+ return;
+ }
+
+ Throwable err = null;
+
+ byte[] errBytes = res.errorBytes();
+
+ if (errBytes != null) {
+ try {
+ GridPeerDeployAware jobPda0 = jobPda;
+
+ err = ctx.config().getMarshaller().unmarshal(
+ errBytes,
+ jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
+
+ return;
+ }
+ }
+
+ f.onDone(null, err);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
+ }
+
+ /**
+ *
+ */
+ void cancelAll() {
+ IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this);
+
+ for (IgniteInternalFuture<?> f : locFuts) {
+ try {
+ f.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to cancel mini-future.", e);
+ }
+ }
+
+ for (GridFutureAdapter<?> f : reqs.values())
+ f.onDone(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ int size;
+
+ synchronized (this) {
+ size = entries.size();
+ }
+
+ return S.toString(Buffer.class, this,
+ "entriesCnt", size,
+ "locFutsSize", locFuts.size(),
+ "reqsSize", reqs.size());
+ }
+ }
+
+ /**
+ * Data streamer peer-deploy aware.
+ */
+ private class DataStreamerPda implements GridPeerDeployAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Deploy class. */
+ private Class<?> cls;
+
+ /** Class loader. */
+ private ClassLoader ldr;
+
+ /** Collection of objects to detect deploy class and class loader. */
+ private Collection<Object> objs;
+
+ /**
+ * Constructs data streamer peer-deploy aware.
+ *
+ * @param objs Collection of objects to detect deploy class and class loader.
+ */
+ private DataStreamerPda(Object... objs) {
+ this.objs = Arrays.asList(objs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> deployClass() {
+ if (cls == null) {
+ Class<?> cls0 = null;
+
+ if (depCls != null)
+ cls0 = depCls;
+ else {
+ for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
+ Object o = it.next();
+
+ if (o != null)
+ cls0 = U.detectClass(o);
+ }
+
+ if (cls0 == null || U.isJdk(cls0))
+ cls0 = IgniteDataStreamerImpl.class;
+ }
+
+ assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
+
+ cls = cls0;
+ }
+
+ return cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClassLoader classLoader() {
+ if (ldr == null) {
+ ClassLoader ldr0 = deployClass().getClassLoader();
+
+ // Safety.
+ if (ldr0 == null)
+ ldr0 = U.gridClassLoader();
+
+ assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
+
+ ldr = ldr0;
+ }
+
+ return ldr;
+ }
+ }
+
+ /**
+ * Entry.
+ */
+ private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private K key;
+
+ /** */
+ private V val;
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ */
+ private Entry0(K key, @Nullable V val) {
+ assert key != null;
+
+ this.key = key;
+ this.val = val;
+ }
+
+ /**
+ * For {@link Externalizable}.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Entry0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public K getKey() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getValue() {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V setValue(V val) {
+ V old = this.val;
+
+ this.val = val;
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(key);
+ out.writeObject(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ key = (K)in.readObject();
+ val = (V)in.readObject();
+ }
+ }
+
+ /**
+ * Wrapper list with special compact serialization of map entries.
+ */
+ private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Wrapped delegate. */
+ private Collection<Map.Entry<K, V>> delegate;
+
+ /** Optional portable processor for converting values. */
+ private GridPortableProcessor portable;
+
+ /**
+ * @param delegate Delegate.
+ * @param portable Portable processor.
+ */
+ private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
+ this.delegate = delegate;
+ this.portable = portable;
+ }
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public Entries0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Entry<K, V>> iterator() {
+ return delegate.iterator();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return delegate.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(delegate.size());
+
+ boolean portableEnabled = portable != null;
+
+ for (Map.Entry<K, V> entry : delegate) {
+ if (portableEnabled) {
+ out.writeObject(portable.marshalToPortable(entry.getKey()));
+ out.writeObject(portable.marshalToPortable(entry.getValue()));
+ }
+ else {
+ out.writeObject(entry.getKey());
+ out.writeObject(entry.getValue());
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ int sz = in.readInt();
+
+ delegate = new ArrayList<>(sz);
+
+ for (int i = 0; i < sz; i++) {
+ Object k = in.readObject();
+ Object v = in.readObject();
+
+ delegate.add(new Entry0<>((K)k, (V)v));
+ }
+ }
+ }
+
+ /**
+ * Isolated updater which only loads entry initial value.
+ */
+ private static class IsolatedUpdater<K, V> implements Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
+
+ GridCacheAdapter<K, V> internalCache = proxy.context().cache();
+
+ if (internalCache.isNear())
+ internalCache = internalCache.context().near().dht();
+
+ GridCacheContext<K, V> cctx = internalCache.context();
+
+ long topVer = cctx.affinity().affinityTopologyVersion();
+
+ GridCacheVersion ver = cctx.versions().next(topVer);
+
+ boolean portable = cctx.portableEnabled();
+
+ for (Map.Entry<K, V> e : entries) {
+ try {
+ K key = e.getKey();
+ V val = e.getValue();
+
+ if (portable) {
+ key = (K)cctx.marshalToPortable(key);
+ val = (V)cctx.marshalToPortable(val);
+ }
+
+ GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
+
+ entry.unswap(true, false);
+
+ entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
+ GridDrType.DR_LOAD);
+
+ cctx.evicts().touch(entry, topVer);
+ }
+ catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
+ // No-op.
+ }
+ catch (IgniteCheckedException ex) {
+ IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+ U.error(log, "Failed to set initial value for cache entry: " + e, ex);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
new file mode 100644
index 0000000..9dd4a8e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerProcessor.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.thread.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+/**
+ *
+ */
+public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
+ /** Loaders map (access is not supposed to be highly concurrent). */
+ private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /** Flushing thread. */
+ private Thread flusher;
+
+ /** */
+ private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
+
+ /** Marshaller. */
+ private final Marshaller marsh;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public IgniteDataStreamerProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof GridDataLoadRequest;
+
+ processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
+ }
+ });
+
+ marsh = ctx.config().getMarshaller();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.config().isDaemon())
+ return;
+
+ flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
+ @Override protected void body() throws InterruptedException {
+ while (!isCancelled()) {
+ IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ if (ldr.isClosed())
+ continue;
+
+ ldr.tryFlush();
+
+ flushQ.offer(ldr);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ }
+ });
+
+ flusher.start();
+
+ if (log.isDebugEnabled())
+ log.debug("Started data streamer processor.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ if (ctx.config().isDaemon())
+ return;
+
+ ctx.io().removeMessageListener(TOPIC_DATALOAD);
+
+ busyLock.block();
+
+ U.interrupt(flusher);
+ U.join(flusher, log);
+
+ for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
+ if (log.isDebugEnabled())
+ log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
+
+ try {
+ ldr.closeEx(cancel);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close data streamer: " + ldr, e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Stopped data streamer processor.");
+ }
+
+ /**
+ * @param cacheName Cache name ({@code null} for default cache).
+ * @param compact {@code true} if data streamer should transfer data in compact format.
+ * @return Data streamer.
+ */
+ public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
+
+ try {
+ final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
+
+ ldrs.add(ldr);
+
+ ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ boolean b = ldrs.remove(ldr);
+
+ assert b : "Loader has not been added to set: " + ldr;
+
+ if (log.isDebugEnabled())
+ log.debug("Loader has been completed: " + ldr);
+ }
+ });
+
+ return ldr;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param cacheName Cache name ({@code null} for default cache).
+ * @return Data streamer.
+ */
+ public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
+ return dataStreamer(cacheName, true);
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param req Request.
+ */
+ private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
+ if (!busyLock.enterBusy()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring data load request (node is stopping): " + req);
+
+ return;
+ }
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Processing data load request: " + req);
+
+ Object topic;
+
+ try {
+ topic = marsh.unmarshal(req.responseTopicBytes(), null);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal topic from request: " + req, e);
+
+ return;
+ }
+
+ ClassLoader clsLdr;
+
+ if (req.forceLocalDeployment())
+ clsLdr = U.gridClassLoader();
+ else {
+ GridDeployment dep = ctx.deploy().getGlobalDeployment(
+ req.deploymentMode(),
+ req.sampleClassName(),
+ req.sampleClassName(),
+ req.userVersion(),
+ nodeId,
+ req.classLoaderId(),
+ req.participants(),
+ null);
+
+ if (dep == null) {
+ sendResponse(nodeId,
+ topic,
+ req.requestId(),
+ new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
+ ", req=" + req + ']'),
+ false);
+
+ return;
+ }
+
+ clsLdr = dep.classLoader();
+ }
+
+ Collection<Map.Entry<K, V>> col;
+ IgniteDataStreamer.Updater<K, V> updater;
+
+ try {
+ col = marsh.unmarshal(req.collectionBytes(), clsLdr);
+ updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
+
+ sendResponse(nodeId, topic, req.requestId(), e, false);
+
+ return;
+ }
+
+ IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
+ log,
+ req.cacheName(),
+ col,
+ req.ignoreDeploymentOwnership(),
+ req.skipStore(),
+ updater);
+
+ Exception err = null;
+
+ try {
+ job.call();
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to finish update job.", e);
+
+ err = e;
+ }
+
+ sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param resTopic Response topic.
+ * @param reqId Request ID.
+ * @param err Error.
+ * @param forceLocDep Force local deployment.
+ */
+ private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
+ boolean forceLocDep) {
+ byte[] errBytes;
+
+ try {
+ errBytes = err != null ? marsh.marshal(err) : null;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal message.", e);
+
+ return;
+ }
+
+ GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
+
+ try {
+ ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.discovery().alive(nodeId))
+ U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
+ else if (log.isDebugEnabled())
+ log.debug("Node has left the grid: " + nodeId);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ X.println(">>>");
+ X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
+ X.println(">>> ldrsSize: " + ldrs.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java
new file mode 100644
index 0000000..9a2f3c3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerUpdateJob.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Job to put entries to cache on affinity node.
+ */
+class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
+ /** */
+ private final GridKernalContext ctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** Cache name. */
+ private final String cacheName;
+
+ /** Entries to put. */
+ private final Collection<Map.Entry<K, V>> col;
+
+ /** {@code True} to ignore deployment ownership. */
+ private final boolean ignoreDepOwnership;
+
+ /** */
+ private final boolean skipStore;
+
+ /** */
+ private final IgniteDataStreamer.Updater<K, V> updater;
+
+ /**
+ * @param ctx Context.
+ * @param log Log.
+ * @param cacheName Cache name.
+ * @param col Entries to put.
+ * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
+ * @param updater Updater.
+ */
+ IgniteDataStreamerUpdateJob(
+ GridKernalContext ctx,
+ IgniteLogger log,
+ @Nullable String cacheName,
+ Collection<Map.Entry<K, V>> col,
+ boolean ignoreDepOwnership,
+ boolean skipStore,
+ IgniteDataStreamer.Updater<K, V> updater) {
+ this.ctx = ctx;
+ this.log = log;
+
+ assert col != null && !col.isEmpty();
+ assert updater != null;
+
+ this.cacheName = cacheName;
+ this.col = col;
+ this.ignoreDepOwnership = ignoreDepOwnership;
+ this.skipStore = skipStore;
+ this.updater = updater;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ if (log.isDebugEnabled())
+ log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
+
+// TODO IGNITE-77: restore adapter usage.
+// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
+//
+// IgniteFuture<?> f = cache.context().preloader().startFuture();
+//
+// if (!f.isDone())
+// f.get();
+//
+// if (ignoreDepOwnership)
+// cache.context().deploy().ignoreOwnership(true);
+
+ IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
+
+ if (skipStore)
+ cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
+
+ if (ignoreDepOwnership)
+ cache.context().deploy().ignoreOwnership(true);
+
+ try {
+ updater.update(cache, col);
+
+ return null;
+ }
+ finally {
+ if (ignoreDepOwnership)
+ cache.context().deploy().ignoreOwnership(false);
+
+ if (log.isDebugEnabled())
+ log.debug("Update job finished on node: " + ctx.localNodeId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html
new file mode 100644
index 0000000..1090b86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/package.html
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<body>
+ <!-- Package description. -->
+ Data streamer processor.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 15309bb..6c922bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
deleted file mode 100644
index 306e615..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImplSelfTest.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.optimized.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-
-/**
- * Tests for {@code IgniteDataStreamerImpl}.
- */
-public class IgniteDataStreamerImplSelfTest extends GridCommonAbstractTest {
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Number of keys to load via data streamer. */
- private static final int KEYS_COUNT = 1000;
-
- /** Started grid counter. */
- private static int cnt;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
- discoSpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoSpi);
-
- // Forth node goes without cache.
- if (cnt < 4)
- cfg.setCacheConfiguration(cacheConfiguration());
-
- cnt++;
-
- return cfg;
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testNullPointerExceptionUponDataStreamerClosing() throws Exception {
- try {
- startGrids(5);
-
- final CyclicBarrier barrier = new CyclicBarrier(2);
-
- multithreadedAsync(new Callable<Object>() {
- @Override public Object call() throws Exception {
- U.awaitQuiet(barrier);
-
- G.stopAll(true);
-
- return null;
- }
- }, 1);
-
- Ignite g4 = grid(4);
-
- IgniteDataStreamer<Object, Object> dataLdr = g4.dataStreamer(null);
-
- dataLdr.perNodeBufferSize(32);
-
- for (int i = 0; i < 100000; i += 2) {
- dataLdr.addData(i, i);
- dataLdr.removeData(i + 1);
- }
-
- U.awaitQuiet(barrier);
-
- info("Closing data streamer.");
-
- try {
- dataLdr.close(true);
- }
- catch (IllegalStateException ignore) {
- // This is ok to ignore this exception as test is racy by it's nature -
- // grid is stopping in different thread.
- }
- }
- finally {
- G.stopAll(true);
- }
- }
-
- /**
- * Data streamer should correctly load entries from HashMap in case of grids with more than one node
- * and with GridOptimizedMarshaller that requires serializable.
- *
- * @throws Exception If failed.
- */
- public void testAddDataFromMap() throws Exception {
- try {
- cnt = 0;
-
- startGrids(2);
-
- Ignite g0 = grid(0);
-
- Marshaller marsh = g0.configuration().getMarshaller();
-
- if (marsh instanceof OptimizedMarshaller)
- assertTrue(((OptimizedMarshaller)marsh).isRequireSerializable());
- else
- fail("Expected GridOptimizedMarshaller, but found: " + marsh.getClass().getName());
-
- IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null);
-
- Map<Integer, String> map = U.newHashMap(KEYS_COUNT);
-
- for (int i = 0; i < KEYS_COUNT; i ++)
- map.put(i, String.valueOf(i));
-
- dataLdr.addData(map);
-
- dataLdr.close();
-
- Random rnd = new Random();
-
- IgniteCache<Integer, String> c = g0.jcache(null);
-
- for (int i = 0; i < KEYS_COUNT; i ++) {
- Integer k = rnd.nextInt(KEYS_COUNT);
-
- String v = c.get(k);
-
- assertEquals(k.toString(), v);
- }
- }
- finally {
- G.stopAll(true);
- }
- }
-
- /**
- * Gets cache configuration.
- *
- * @return Cache configuration.
- */
- private CacheConfiguration cacheConfiguration() {
- CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
- cacheCfg.setCacheMode(PARTITIONED);
- cacheCfg.setBackups(1);
- cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-
- return cacheCfg;
- }
-
- /**
- *
- */
- private static class TestObject implements Serializable {
- /** */
- private int val;
-
- /**
- */
- private TestObject() {
- // No-op.
- }
-
- /**
- * @param val Value.
- */
- private TestObject(int val) {
- this.val = val;
- }
-
- public Integer val() {
- return val;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return val;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- return obj instanceof TestObject && ((TestObject)obj).val == val;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
deleted file mode 100644
index 22a1f97..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerPerformanceTest.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-import org.jdk8.backport.*;
-
-import java.util.concurrent.*;
-
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Data streamer performance test. Compares group lock data streamer to traditional lock.
- * <p>
- * Disable assertions and give at least 2 GB heap to run this test.
- */
-public class IgniteDataStreamerPerformanceTest extends GridCommonAbstractTest {
- /** */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** */
- private static final int GRID_CNT = 3;
-
- /** */
- private static final int ENTRY_CNT = 80000;
-
- /** */
- private boolean useCache;
-
- /** */
- private String[] vals = new String[2048];
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
- IgniteConfiguration cfg = super.getConfiguration(gridName);
-
- TcpDiscoverySpi spi = new TcpDiscoverySpi();
-
- spi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(spi);
-
- cfg.setIncludeProperties();
-
- cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-
- cfg.setConnectorConfiguration(null);
-
- cfg.setPeerClassLoadingEnabled(true);
-
- if (useCache) {
- CacheConfiguration cc = defaultCacheConfiguration();
-
- cc.setCacheMode(PARTITIONED);
-
- cc.setDistributionMode(PARTITIONED_ONLY);
- cc.setWriteSynchronizationMode(FULL_SYNC);
- cc.setStartSize(ENTRY_CNT / GRID_CNT);
- cc.setSwapEnabled(false);
-
- cc.setBackups(1);
-
- cc.setStoreValueBytes(true);
-
- cfg.setCacheSanityCheckEnabled(false);
- cfg.setCacheConfiguration(cc);
- }
- else
- cfg.setCacheConfiguration();
-
- return cfg;
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- for (int i = 0; i < vals.length; i++) {
- int valLen = ThreadLocalRandom8.current().nextInt(128, 512);
-
- StringBuilder sb = new StringBuilder();
-
- for (int j = 0; j < valLen; j++)
- sb.append('a' + ThreadLocalRandom8.current().nextInt(20));
-
- vals[i] = sb.toString();
-
- info("Value: " + vals[i]);
- }
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testPerformance() throws Exception {
- doTest();
- }
-
- /**
- * @throws Exception If failed.
- */
- private void doTest() throws Exception {
- System.gc();
- System.gc();
- System.gc();
-
- try {
- useCache = true;
-
- startGridsMultiThreaded(GRID_CNT);
-
- useCache = false;
-
- Ignite ignite = startGrid();
-
- final IgniteDataStreamer<Integer, String> ldr = ignite.dataStreamer(null);
-
- ldr.perNodeBufferSize(8192);
- ldr.updater(IgniteDataStreamerCacheUpdaters.<Integer, String>batchedSorted());
- ldr.autoFlushFrequency(0);
-
- final LongAdder cnt = new LongAdder();
-
- long start = U.currentTimeMillis();
-
- Thread t = new Thread(new Runnable() {
- @SuppressWarnings("BusyWait")
- @Override public void run() {
- while (true) {
- try {
- Thread.sleep(10000);
- }
- catch (InterruptedException ignored) {
- break;
- }
-
- info(">>> Adds/sec: " + cnt.sumThenReset() / 10);
- }
- }
- });
-
- t.setDaemon(true);
-
- t.start();
-
- int threadNum = 2;//Runtime.getRuntime().availableProcessors();
-
- multithreaded(new Callable<Object>() {
- @SuppressWarnings("InfiniteLoopStatement")
- @Override public Object call() throws Exception {
- ThreadLocalRandom8 rnd = ThreadLocalRandom8.current();
-
- while (true) {
- int i = rnd.nextInt(ENTRY_CNT);
-
- ldr.addData(i, vals[rnd.nextInt(vals.length)]);
-
- cnt.increment();
- }
- }
- }, threadNum, "loader");
-
- info("Closing loader...");
-
- ldr.close(false);
-
- long duration = U.currentTimeMillis() - start;
-
- info("Finished performance test. Duration: " + duration + "ms.");
- }
- finally {
- stopAllGrids();
- }
- }
-}
[4/5] incubator-ignite git commit: # gg-9869
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
deleted file mode 100644
index 7db41e6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- *
- */
-public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
- /** Loaders map (access is not supposed to be highly concurrent). */
- private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /** Flushing thread. */
- private Thread flusher;
-
- /** */
- private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
-
- /** Marshaller. */
- private final Marshaller marsh;
-
- /**
- * @param ctx Kernal context.
- */
- public IgniteDataStreamerProcessor(GridKernalContext ctx) {
- super(ctx);
-
- ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof GridDataLoadRequest;
-
- processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
- }
- });
-
- marsh = ctx.config().getMarshaller();
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- if (ctx.config().isDaemon())
- return;
-
- flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
- @Override protected void body() throws InterruptedException {
- while (!isCancelled()) {
- IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
-
- if (!busyLock.enterBusy())
- return;
-
- try {
- if (ldr.isClosed())
- continue;
-
- ldr.tryFlush();
-
- flushQ.offer(ldr);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- }
- });
-
- flusher.start();
-
- if (log.isDebugEnabled())
- log.debug("Started data streamer processor.");
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- if (ctx.config().isDaemon())
- return;
-
- ctx.io().removeMessageListener(TOPIC_DATALOAD);
-
- busyLock.block();
-
- U.interrupt(flusher);
- U.join(flusher, log);
-
- for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
- if (log.isDebugEnabled())
- log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
-
- try {
- ldr.closeEx(cancel);
- }
- catch (IgniteInterruptedCheckedException e) {
- U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to close data streamer: " + ldr, e);
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Stopped data streamer processor.");
- }
-
- /**
- * @param cacheName Cache name ({@code null} for default cache).
- * @param compact {@code true} if data streamer should transfer data in compact format.
- * @return Data streamer.
- */
- public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
- if (!busyLock.enterBusy())
- throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
-
- try {
- final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
-
- ldrs.add(ldr);
-
- ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- boolean b = ldrs.remove(ldr);
-
- assert b : "Loader has not been added to set: " + ldr;
-
- if (log.isDebugEnabled())
- log.debug("Loader has been completed: " + ldr);
- }
- });
-
- return ldr;
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * @param cacheName Cache name ({@code null} for default cache).
- * @return Data streamer.
- */
- public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
- return dataStreamer(cacheName, true);
- }
-
- /**
- * @param nodeId Sender ID.
- * @param req Request.
- */
- private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
- if (!busyLock.enterBusy()) {
- if (log.isDebugEnabled())
- log.debug("Ignoring data load request (node is stopping): " + req);
-
- return;
- }
-
- try {
- if (log.isDebugEnabled())
- log.debug("Processing data load request: " + req);
-
- Object topic;
-
- try {
- topic = marsh.unmarshal(req.responseTopicBytes(), null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal topic from request: " + req, e);
-
- return;
- }
-
- ClassLoader clsLdr;
-
- if (req.forceLocalDeployment())
- clsLdr = U.gridClassLoader();
- else {
- GridDeployment dep = ctx.deploy().getGlobalDeployment(
- req.deploymentMode(),
- req.sampleClassName(),
- req.sampleClassName(),
- req.userVersion(),
- nodeId,
- req.classLoaderId(),
- req.participants(),
- null);
-
- if (dep == null) {
- sendResponse(nodeId,
- topic,
- req.requestId(),
- new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
- ", req=" + req + ']'),
- false);
-
- return;
- }
-
- clsLdr = dep.classLoader();
- }
-
- Collection<Map.Entry<K, V>> col;
- IgniteDataStreamer.Updater<K, V> updater;
-
- try {
- col = marsh.unmarshal(req.collectionBytes(), clsLdr);
- updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
-
- sendResponse(nodeId, topic, req.requestId(), e, false);
-
- return;
- }
-
- IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
- log,
- req.cacheName(),
- col,
- req.ignoreDeploymentOwnership(),
- req.skipStore(),
- updater);
-
- Exception err = null;
-
- try {
- job.call();
- }
- catch (Exception e) {
- U.error(log, "Failed to finish update job.", e);
-
- err = e;
- }
-
- sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param resTopic Response topic.
- * @param reqId Request ID.
- * @param err Error.
- * @param forceLocDep Force local deployment.
- */
- private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
- boolean forceLocDep) {
- byte[] errBytes;
-
- try {
- errBytes = err != null ? marsh.marshal(err) : null;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal message.", e);
-
- return;
- }
-
- GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
-
- try {
- ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
- }
- catch (IgniteCheckedException e) {
- if (ctx.discovery().alive(nodeId))
- U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
- else if (log.isDebugEnabled())
- log.debug("Node has left the grid: " + nodeId);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void printMemoryStats() {
- X.println(">>>");
- X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
- X.println(">>> ldrsSize: " + ldrs.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
deleted file mode 100644
index 1a3db40..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
- /** */
- private final GridKernalContext ctx;
-
- /** */
- private final IgniteLogger log;
-
- /** Cache name. */
- private final String cacheName;
-
- /** Entries to put. */
- private final Collection<Map.Entry<K, V>> col;
-
- /** {@code True} to ignore deployment ownership. */
- private final boolean ignoreDepOwnership;
-
- /** */
- private final boolean skipStore;
-
- /** */
- private final IgniteDataStreamer.Updater<K, V> updater;
-
- /**
- * @param ctx Context.
- * @param log Log.
- * @param cacheName Cache name.
- * @param col Entries to put.
- * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
- * @param updater Updater.
- */
- IgniteDataStreamerUpdateJob(
- GridKernalContext ctx,
- IgniteLogger log,
- @Nullable String cacheName,
- Collection<Map.Entry<K, V>> col,
- boolean ignoreDepOwnership,
- boolean skipStore,
- IgniteDataStreamer.Updater<K, V> updater) {
- this.ctx = ctx;
- this.log = log;
-
- assert col != null && !col.isEmpty();
- assert updater != null;
-
- this.cacheName = cacheName;
- this.col = col;
- this.ignoreDepOwnership = ignoreDepOwnership;
- this.skipStore = skipStore;
- this.updater = updater;
- }
-
- /** {@inheritDoc} */
- @Override public Object call() throws Exception {
- if (log.isDebugEnabled())
- log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
-
-// TODO IGNITE-77: restore adapter usage.
-// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-//
-// IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-// if (!f.isDone())
-// f.get();
-//
-// if (ignoreDepOwnership)
-// cache.context().deploy().ignoreOwnership(true);
-
- IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
- if (skipStore)
- cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
- if (ignoreDepOwnership)
- cache.context().deploy().ignoreOwnership(true);
-
- try {
- updater.update(cache, col);
-
- return null;
- }
- finally {
- if (ignoreDepOwnership)
- cache.context().deploy().ignoreOwnership(false);
-
- if (log.isDebugEnabled())
- log.debug("Update job finished on node: " + ctx.localNodeId());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
deleted file mode 100644
index 1090b86..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
- <!-- Package description. -->
- Data streamer processor.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
new file mode 100644
index 0000000..d77b52e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridDataLoadRequest implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long reqId;
+
+ /** */
+ private byte[] resTopicBytes;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /** */
+ private byte[] updaterBytes;
+
+ /** Entries to put. */
+ private byte[] colBytes;
+
+ /** {@code True} to ignore deployment ownership. */
+ private boolean ignoreDepOwnership;
+
+ /** */
+ private boolean skipStore;
+
+ /** */
+ private DeploymentMode depMode;
+
+ /** */
+ private String sampleClsName;
+
+ /** */
+ private String userVer;
+
+ /** Node class loader participants. */
+ @GridToStringInclude
+ @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+ private Map<UUID, IgniteUuid> ldrParticipants;
+
+ /** */
+ private IgniteUuid clsLdrId;
+
+ /** */
+ private boolean forceLocDep;
+
+ /**
+ * {@code Externalizable} support.
+ */
+ public GridDataLoadRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param resTopicBytes Response topic.
+ * @param cacheName Cache name.
+ * @param updaterBytes Cache updater.
+ * @param colBytes Collection bytes.
+ * @param ignoreDepOwnership Ignore ownership.
+ * @param skipStore Skip store flag.
+ * @param depMode Deployment mode.
+ * @param sampleClsName Sample class name.
+ * @param userVer User version.
+ * @param ldrParticipants Loader participants.
+ * @param clsLdrId Class loader ID.
+ * @param forceLocDep Force local deployment.
+ */
+ public GridDataLoadRequest(long reqId,
+ byte[] resTopicBytes,
+ @Nullable String cacheName,
+ byte[] updaterBytes,
+ byte[] colBytes,
+ boolean ignoreDepOwnership,
+ boolean skipStore,
+ DeploymentMode depMode,
+ String sampleClsName,
+ String userVer,
+ Map<UUID, IgniteUuid> ldrParticipants,
+ IgniteUuid clsLdrId,
+ boolean forceLocDep) {
+ this.reqId = reqId;
+ this.resTopicBytes = resTopicBytes;
+ this.cacheName = cacheName;
+ this.updaterBytes = updaterBytes;
+ this.colBytes = colBytes;
+ this.ignoreDepOwnership = ignoreDepOwnership;
+ this.skipStore = skipStore;
+ this.depMode = depMode;
+ this.sampleClsName = sampleClsName;
+ this.userVer = userVer;
+ this.ldrParticipants = ldrParticipants;
+ this.clsLdrId = clsLdrId;
+ this.forceLocDep = forceLocDep;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Response topic.
+ */
+ public byte[] responseTopicBytes() {
+ return resTopicBytes;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String cacheName() {
+ return cacheName;
+ }
+
+ /**
+ * @return Updater.
+ */
+ public byte[] updaterBytes() {
+ return updaterBytes;
+ }
+
+ /**
+ * @return Collection bytes.
+ */
+ public byte[] collectionBytes() {
+ return colBytes;
+ }
+
+ /**
+ * @return {@code True} to ignore ownership.
+ */
+ public boolean ignoreDeploymentOwnership() {
+ return ignoreDepOwnership;
+ }
+
+ /**
+ * @return Skip store flag.
+ */
+ public boolean skipStore() {
+ return skipStore;
+ }
+
+ /**
+ * @return Deployment mode.
+ */
+ public DeploymentMode deploymentMode() {
+ return depMode;
+ }
+
+ /**
+ * @return Sample class name.
+ */
+ public String sampleClassName() {
+ return sampleClsName;
+ }
+
+ /**
+ * @return User version.
+ */
+ public String userVersion() {
+ return userVer;
+ }
+
+ /**
+ * @return Participants.
+ */
+ public Map<UUID, IgniteUuid> participants() {
+ return ldrParticipants;
+ }
+
+ /**
+ * @return Class loader ID.
+ */
+ public IgniteUuid classLoaderId() {
+ return clsLdrId;
+ }
+
+ /**
+ * @return {@code True} to force local deployment.
+ */
+ public boolean forceLocalDeployment() {
+ return forceLocDep;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDataLoadRequest.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeString("cacheName", cacheName))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeByteArray("colBytes", colBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeBoolean("forceLocDep", forceLocDep))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeString("sampleClsName", sampleClsName))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("skipStore", skipStore))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeByteArray("updaterBytes", updaterBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeString("userVer", userVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cacheName = reader.readString("cacheName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ clsLdrId = reader.readIgniteUuid("clsLdrId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ colBytes = reader.readByteArray("colBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ byte depModeOrd;
+
+ depModeOrd = reader.readByte("depMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ depMode = DeploymentMode.fromOrdinal(depModeOrd);
+
+ reader.incrementState();
+
+ case 4:
+ forceLocDep = reader.readBoolean("forceLocDep");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ resTopicBytes = reader.readByteArray("resTopicBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ sampleClsName = reader.readString("sampleClsName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ skipStore = reader.readBoolean("skipStore");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ updaterBytes = reader.readByteArray("updaterBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ userVer = reader.readString("userVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 62;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 13;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
new file mode 100644
index 0000000..25ff9ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+
+/**
+ *
+ */
+public class GridDataLoadResponse implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long reqId;
+
+ /** */
+ private byte[] errBytes;
+
+ /** */
+ private boolean forceLocDep;
+
+ /**
+ * @param reqId Request ID.
+ * @param errBytes Error bytes.
+ * @param forceLocDep Force local deployment.
+ */
+ public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
+ this.reqId = reqId;
+ this.errBytes = errBytes;
+ this.forceLocDep = forceLocDep;
+ }
+
+ /**
+ * {@code Externalizable} support.
+ */
+ public GridDataLoadResponse() {
+ // No-op.
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Error bytes.
+ */
+ public byte[] errorBytes() {
+ return errBytes;
+ }
+
+ /**
+ * @return {@code True} to force local deployment.
+ */
+ public boolean forceLocalDeployment() {
+ return forceLocDep;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDataLoadResponse.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeBoolean("forceLocDep", forceLocDep))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ forceLocDep = reader.readBoolean("forceLocDep");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 63;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..629c7b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class IgniteDataStreamerCacheUpdaters {
+ /** */
+ private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
+
+ /** */
+ private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+ /** */
+ private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
+
+ /**
+ * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
+ * is not the best.
+ *
+ * @return Single updater.
+ */
+ public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+ return INDIVIDUAL;
+ }
+
+ /**
+ * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
+ * updated concurrently. Performance is generally better than in {@link #individual()}.
+ *
+ * @return Batched updater.
+ */
+ public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+ return BATCHED;
+ }
+
+ /**
+ * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
+ * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
+ *
+ * @return Batched sorted updater.
+ */
+ public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
+ return BATCHED_SORTED;
+ }
+
+ /**
+ * Updates cache.
+ *
+ * @param cache Cache.
+ * @param rmvCol Keys to remove.
+ * @param putMap Entries to put.
+ * @throws IgniteException If failed.
+ */
+ protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
+ Map<K, V> putMap) {
+ assert rmvCol != null || putMap != null;
+
+ // Here we assume that there are no key duplicates, so the following calls are valid.
+ if (rmvCol != null)
+ ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+ if (putMap != null)
+ cache.putAll(putMap);
+ }
+
+ /**
+ * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
+ */
+ private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ V val = entry.getValue();
+
+ if (val == null)
+ cache.remove(key);
+ else
+ cache.put(key, val);
+ }
+ }
+ }
+
+ /**
+ * Batched updater. Updates cache using batch operations thus is dead lock prone.
+ */
+ private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ Map<K, V> putAll = null;
+ Set<K> rmvAll = null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ V val = entry.getValue();
+
+ if (val == null) {
+ if (rmvAll == null)
+ rmvAll = new HashSet<>();
+
+ rmvAll.add(key);
+ }
+ else {
+ if (putAll == null)
+ putAll = new HashMap<>();
+
+ putAll.put(key, val);
+ }
+ }
+
+ updateAll(cache, rmvAll, putAll);
+ }
+ }
+
+ /**
+ * Batched updater. Updates cache using batch operations thus is dead lock prone.
+ */
+ private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ Map<K, V> putAll = null;
+ Set<K> rmvAll = null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key instanceof Comparable;
+
+ V val = entry.getValue();
+
+ if (val == null) {
+ if (rmvAll == null)
+ rmvAll = new TreeSet<>();
+
+ rmvAll.add(key);
+ }
+ else {
+ if (putAll == null)
+ putAll = new TreeMap<>();
+
+ putAll.put(key, val);
+ }
+ }
+
+ updateAll(cache, rmvAll, putAll);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
new file mode 100644
index 0000000..b6aa15c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Data streamer future.
+ */
+class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Data streamer. */
+ @GridToStringExclude
+ private IgniteDataStreamerImpl dataLdr;
+
+ /**
+ * Default constructor for {@link Externalizable} support.
+ */
+ public IgniteDataStreamerFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Context.
+ * @param dataLdr Data streamer.
+ */
+ IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
+ super(ctx);
+
+ assert dataLdr != null;
+
+ this.dataLdr = dataLdr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() throws IgniteCheckedException {
+ checkValid();
+
+ if (onCancelled()) {
+ dataLdr.closeEx(true);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
+ }
+}