You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/04 12:06:17 UTC

incubator-ignite git commit: 9149 logging

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-gg-9149 [created] d39b47711


9149 logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d39b4771
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d39b4771
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d39b4771

Branch: refs/heads/ignite-gg-9149
Commit: d39b47711d6a48e80d6434b483c321203638fc32
Parents: b056a73
Author: Anton Vinogradov <av...@gridgain.com>
Authored: Tue Aug 4 13:03:54 2015 +0300
Committer: Anton Vinogradov <av...@gridgain.com>
Committed: Tue Aug 4 13:03:54 2015 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          | 22 ++++++++++++++++++++
 .../dr/IgniteDrDataStreamerCacheUpdater.java    | 15 ++++++++++++-
 2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d39b4771/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 5fae676..5659d6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -472,6 +472,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
         return addDataInternal(Collections.singleton(new DataStreamerEntry(key, null)));
     }
 
+    public static ConcurrentHashMap<Integer, ConcurrentLinkedQueue> map = new ConcurrentHashMap<>();
+
     /**
      * @param entries Entries.
      * @return Future.
@@ -1291,6 +1293,26 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
                     topVer);
 
                 try {
+
+                    for (DataStreamerEntry entry : entries) {
+                        try {
+
+                            byte[] bytes = ((GridCacheRawVersionedEntry)entry).valueBytes();
+                            if (bytes != null) {
+                                CacheObject val = ctx.config().getMarshaller().unmarshal(bytes, null);
+                                val.finishUnmarshal(cacheObjCtx, null);
+
+                                Integer key0 = (Integer)(entry.getKey().value(null, false));
+                                map.putIfAbsent(key0, new ConcurrentLinkedQueue());
+                                map.get(key0).add(val.value(null, false));
+
+                            }
+                            // entry.setValue(null);
+                        }
+                        catch (Exception e) {
+                        }
+                    }
+
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
 
                     if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d39b4771/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
index e5bbe39..b0169c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.dr;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -29,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.stream.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 
 /**
  * Data center replication cache receiver for data streamer.
@@ -38,6 +41,8 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache
     /** */
     private static final long serialVersionUID = 0L;
 
+    public static ConcurrentHashMap<Integer, ConcurrentLinkedQueue> map = new ConcurrentHashMap<>();
+
     /** {@inheritDoc} */
     @Override public void receive(IgniteCache<KeyCacheObject, CacheObject> cache0,
         Collection<Map.Entry<KeyCacheObject, CacheObject>> col) {
@@ -79,8 +84,16 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache
 
                 if (val == null)
                     cache.removeAllConflict(Collections.singletonMap(key, entry.version()));
-                else
+                else {
+                    Affinity affinity = cache0.unwrap(Ignite.class).affinity(cache0.getName());
+                    ClusterNode localNode = cache0.unwrap(Ignite.class).cluster().localNode();
+
+                    Integer key0 = (key.value(null, false));
+
+                    map.putIfAbsent(key0, new ConcurrentLinkedQueue());
+                    map.get(key0).add(localNode.id() + " - " + val.value().value(null, false) + "-prim " + affinity.isPrimary(localNode, key) + " ver " + entry.version());
                     cache.putAllConflict(Collections.singletonMap(key, val));
+                }
             }
 
             if (log.isDebugEnabled())