You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/04 12:06:17 UTC
incubator-ignite git commit: 9149 logging
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-gg-9149 [created] d39b47711
9149 logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d39b4771
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d39b4771
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d39b4771
Branch: refs/heads/ignite-gg-9149
Commit: d39b47711d6a48e80d6434b483c321203638fc32
Parents: b056a73
Author: Anton Vinogradov <av...@gridgain.com>
Authored: Tue Aug 4 13:03:54 2015 +0300
Committer: Anton Vinogradov <av...@gridgain.com>
Committed: Tue Aug 4 13:03:54 2015 +0300
----------------------------------------------------------------------
.../datastreamer/DataStreamerImpl.java | 22 ++++++++++++++++++++
.../dr/IgniteDrDataStreamerCacheUpdater.java | 15 ++++++++++++-
2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d39b4771/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 5fae676..5659d6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -472,6 +472,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
return addDataInternal(Collections.singleton(new DataStreamerEntry(key, null)));
}
+ public static ConcurrentHashMap<Integer, ConcurrentLinkedQueue> map = new ConcurrentHashMap<>();
+
/**
* @param entries Entries.
* @return Future.
@@ -1291,6 +1293,26 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
topVer);
try {
+
+ for (DataStreamerEntry entry : entries) {
+ try {
+
+ byte[] bytes = ((GridCacheRawVersionedEntry)entry).valueBytes();
+ if (bytes != null) {
+ CacheObject val = ctx.config().getMarshaller().unmarshal(bytes, null);
+ val.finishUnmarshal(cacheObjCtx, null);
+
+ Integer key0 = (Integer)(entry.getKey().value(null, false));
+ map.putIfAbsent(key0, new ConcurrentLinkedQueue());
+ map.get(key0).add(val.value(null, false));
+
+ }
+ // entry.setValue(null);
+ }
+ catch (Exception e) {
+ }
+ }
+
ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d39b4771/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
index e5bbe39..b0169c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.dr;
import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
@@ -29,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.stream.*;
import java.util.*;
+import java.util.concurrent.*;
/**
* Data center replication cache receiver for data streamer.
@@ -38,6 +41,8 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache
/** */
private static final long serialVersionUID = 0L;
+ public static ConcurrentHashMap<Integer, ConcurrentLinkedQueue> map = new ConcurrentHashMap<>();
+
/** {@inheritDoc} */
@Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache0,
Collection<Map.Entry<KeyCacheObject, CacheObject>> col) {
@@ -79,8 +84,16 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache
if (val == null)
cache.removeAllConflict(Collections.singletonMap(key, entry.version()));
- else
+ else {
+ Affinity affinity = cache0.unwrap(Ignite.class).affinity(cache0.getName());
+ ClusterNode localNode = cache0.unwrap(Ignite.class).cluster().localNode();
+
+ Integer key0 = (key.value(null, false));
+
+ map.putIfAbsent(key0, new ConcurrentLinkedQueue());
+ map.get(key0).add(localNode.id() + " - " + val.value().value(null, false) + "-prim " + affinity.isPrimary(localNode, key) + " ver " + entry.version());
cache.putAllConflict(Collections.singletonMap(key, val));
+ }
}
if (log.isDebugEnabled())