You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/23 03:36:30 UTC

[01/23] ignite git commit: debugging ignite-1171

Repository: ignite
Updated Branches:
  refs/heads/ignite-1171 46a22e3a7 -> f5dcaf35e


debugging ignite-1171


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10ee1a55
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10ee1a55
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10ee1a55

Branch: refs/heads/ignite-1171
Commit: 10ee1a5563f106c7f00f7e5a999746da7b944d46
Parents: d3dd2cc
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Sep 18 21:29:52 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Sep 18 21:29:52 2015 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java |  22 +++-
 .../communication/tcp/TcpCommunicationSpi.java  |   5 +-
 .../discovery/DiscoverySpiCustomMessage.java    |  12 ++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 104 +++++++++++++------
 .../spi/discovery/tcp/TcpDiscoveryImpl.java     |   6 +-
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |   2 +-
 6 files changed, 104 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index da02b97..eaa66af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -47,6 +47,7 @@ import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -448,8 +449,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             taskNameHash,
             skipPrimaryCheck);
 
-        UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval,
-            autoUnsubscribe, grp.predicate()).get();
+        IgniteInternalFuture<UUID> f = cctx.kernalContext().continuous().startRoutine(
+            hnd,
+            bufSize,
+            timeInterval,
+            autoUnsubscribe,
+            grp.predicate());
+
+        while (!f.isDone()) {
+            try {
+                f.get(2000);
+            }
+            catch (Exception e) {
+                U.debug(log, "### Failed to wait for future: " + cctx.gridName() + " " + cctx.nodeId() + " " + f);
+            }
+        }
+
+        UUID id = f.get();
 
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();
@@ -811,4 +827,4 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 144a0fd..c93d5af 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -2126,8 +2126,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
                     timeoutHelper.checkFailureTimeoutReached(e))) {
-                    log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
-                        failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
+                            failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
 
                     throw e;
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
index 373c121..a0f9b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,13 +18,15 @@
 package org.apache.ignite.spi.discovery;
 
 import java.io.Serializable;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Message to send across ring.
  *
- * @see org.apache.ignite.internal.managers.discovery.GridDiscoveryManager#sendCustomEvent(
- * org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage)
+ * @see GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)
  */
 public interface DiscoverySpiCustomMessage extends Serializable {
     /**
@@ -36,4 +38,4 @@ public interface DiscoverySpiCustomMessage extends Serializable {
      * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes.
      */
     public boolean isMutable();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 92f90a1..9d0b3c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -715,7 +716,14 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** {@inheritDoc} */
     @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
         try {
-            msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt, spi.marsh.marshal(evt)));
+            TcpDiscoveryCustomEventMessage msg = new TcpDiscoveryCustomEventMessage(
+                getLocalNodeId(),
+                evt,
+                spi.marsh.marshal(evt));
+
+            U.debug(log, "Sending custom event: " + msg);
+
+            msgWorker.addMessage(msg);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
@@ -1857,6 +1865,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             while (msgs.size() > MAX) {
                 TcpDiscoveryAbstractMessage polled = msgs.poll();
 
+                if (polled instanceof DiscoveryCustomMessage)
+                    U.debug("### Discarded custom message ###: " + msg);
+
                 assert polled != null;
 
                 if (polled.id().equals(discardId))
@@ -1865,30 +1876,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Resets pending messages.
-         *
-         * @param msgs Message.
-         * @param discardId Discarded message ID.
-         */
-        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
-            this.msgs.clear();
-
-            if (msgs != null)
-                this.msgs.addAll(msgs);
-
-            this.discardId = discardId;
-        }
-
-        /**
-         * Clears pending messages.
-         */
-        void clear() {
-            msgs.clear();
-
-            discardId = null;
-        }
-
-        /**
          * Discards message with provided ID and all before it.
          *
          * @param id Discarded message ID.
@@ -1943,7 +1930,7 @@ class ServerImpl extends TcpDiscoveryImpl {
         private long connCheckThreshold;
 
         /** Pending custom messages that should not be sent between NodeAdded and NodeAddFinished messages. */
-        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new LinkedList<>();
+        private Queue<TcpDiscoveryCustomEventMessage> pendingCustomMsgs = new ArrayDeque<>();
 
         /** Collection to track joining nodes. */
         private Set<UUID> joiningNodes = new HashSet<>();
@@ -2053,6 +2040,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             sendHeartbeatMessage();
 
             checkHeartbeatsReceiving();
+
+            checkPendingCustomMessages();
         }
 
         /**
@@ -2326,6 +2315,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     log.debug("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
 
+                                U.debug(
+                                    "### Pending messages will be sent [failure=" + failure +
+                                        ", forceSndPending=" + forceSndPending + ']');
+
                                 if (debugMode)
                                     debugLog("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
@@ -2337,9 +2330,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         if (pendingMsg.id().equals(pendingMsgs.discardId))
                                             skip = false;
 
-                                        continue;
+                                        if (!(msg instanceof DiscoveryCustomMessage))
+                                            continue;
+                                        else
+                                            U.debug(log, "Avoid skipping custom message: " + pendingMsg);
                                     }
 
+                                    U.debug(log, "Sending pending: " + pendingMsg);
+
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
@@ -2361,13 +2359,18 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
 
                                     if (log.isDebugEnabled())
-                                        log.debug("Pending message has been sent to next node [msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+                                        log.debug("Pending message has been sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
+                                            ", res=" + res + ']');
+
+                                    if (msg instanceof TcpDiscoveryCustomEventMessage)
+                                        U.debug(log, "Pending message has been sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
                                     if (debugMode)
-                                        debugLog("Pending message has been sent to next node [msg=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg + ", next=" + next.id() +
+                                        debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
                                     // Resetting timeout control object to create a new one for the next bunch of
@@ -2405,6 +2408,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
 
+                                if (msg instanceof TcpDiscoveryCustomEventMessage)
+                                U.debug(log, "Message has been sent to next node [msg=" + msg +
+                                        ", next=" + next.id() +
+                                        ", res=" + res + ']');
+
                                 if (debugMode)
                                     debugLog("Message has been sent to next node [msg=" + msg +
                                         ", next=" + next.id() +
@@ -3132,6 +3140,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.add(node.id());
 
+                U.debug(log, "Added to joining: " + node.id());
+
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
 
@@ -3236,6 +3246,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 n.visible(true);
                             }
 
+                            joiningNodes.clear();
+
                             locNode.setAttributes(node.attributes());
 
                             locNode.visible(true);
@@ -3573,6 +3585,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(leftNode.id());
 
+                U.debug(log, "removed from joining 3568: " + leftNode.id());
+
                 spi.stats.onNodeLeft();
 
                 notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3731,6 +3745,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(node.id());
 
+                U.debug(log, "removed from joining 3728: " + node.id());
+
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();
@@ -4127,6 +4143,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (!joiningNodes.isEmpty()) {
                     pendingCustomMsgs.add(msg);
 
+                    U.debug(log, "Added to pending: " + msg);
+
                     return;
                 }
 
@@ -4138,6 +4156,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     notifyDiscoveryListener(msg);
 
+                    U.debug(log, "Verified: " + msg);
+
                     sndNext = true;
                 }
                 else
@@ -4171,22 +4191,40 @@ class ServerImpl extends TcpDiscoveryImpl {
                         }
                     }
 
-                    addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
+                    U.debug(log, "Discarding custom message: " + msg);
+
+                    //addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
                 }
             }
             else {
-                if (msg.verified())
+                if (msg.verified()) {
+                    assert joiningNodes.isEmpty();
+
+                    U.debug(log, "Processing custom message: " + msg);
+
                     notifyDiscoveryListener(msg);
+                }
 
                 if (ring.hasRemoteNodes())
                     sendMessageAcrossRing(msg);
             }
         }
 
+        long lastCheck = U.currentTimeMillis();
+
         /**
          * Checks and flushes custom event messages if no nodes are attempting to join the grid.
          */
         private void checkPendingCustomMessages() {
+            if (lastCheck + 2000 < U.currentTimeMillis()) {
+                U.debug(
+                    log,
+                    "Custom messages [msgs=" + pendingCustomMsgs.size() + ", locNodeId=" + locNode.id() +
+                        ", locNodeOrder=" + locNode.order() + ", joining=" + joiningNodes + ']');
+
+                lastCheck = U.currentTimeMillis();
+            }
+
             if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
index e5be530..2786d0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryImpl.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -313,4 +313,4 @@ abstract class TcpDiscoveryImpl {
 
         return res;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ee1a55/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 4f3c9a9..1ccbe1f 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -293,4 +293,4 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest {
             stopAllGrids();
         }
     }
-}
\ No newline at end of file
+}


[12/23] ignite git commit: ignite-973 - fix

Posted by ag...@apache.org.
ignite-973 - fix


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0be45e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0be45e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0be45e3

Branch: refs/heads/ignite-1171
Commit: f0be45e309f9a594334209a251c069f9ba3db120
Parents: e51fb42
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:36:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:36:40 2015 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2AbstractKeyValueRow.java           |  3 +++
 .../internal/processors/query/h2/opt/GridH2Table.java     | 10 +++++++++-
 .../internal/processors/cache/CacheIndexStreamerTest.java |  4 ++--
 3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 07c49a5..4a16284 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,6 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
      * @throws IgniteCheckedException If failed.
      */
     public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
+        if (peekValue(VAL_COL) != null)
+            return;
+
         setValue(VAL_COL, desc.wrap(val, desc.valueType()));
 
         notifyAll();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 66241b4..bf318b2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -55,6 +55,8 @@ import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
+
 /**
  * H2 Table implementation.
  */
@@ -372,6 +374,12 @@ public class GridH2Table extends TableBase {
             if (!del) {
                 GridH2Row old = pk.put(row); // Put to PK.
 
+                if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value on replace.
+                    GridH2AbstractKeyValueRow kvOld = (GridH2AbstractKeyValueRow)old;
+
+                    kvOld.onUnswap(kvOld.getValue(VAL_COL), true);
+                }
+
                 int len = idxs.size();
 
                 int i = 1;
@@ -399,7 +407,7 @@ public class GridH2Table extends TableBase {
                 GridH2Row old = pk.remove(row);
 
                 if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value.
-                    Value v = row.getValue(GridH2AbstractKeyValueRow.VAL_COL);
+                    Value v = row.getValue(VAL_COL);
 
                     if (v != null)
                         ((GridH2AbstractKeyValueRow)old).onUnswap(v.getObject(), true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 25c3b81..23f4e91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -52,7 +52,7 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
-        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+        cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
 
         return cfg;
     }


[04/23] ignite git commit: IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.

Posted by ag...@apache.org.
IGNITE-1171 - Delay custom messages between NodeAdded and NodeAddFinished messages.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19e34f6c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19e34f6c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19e34f6c

Branch: refs/heads/ignite-1171
Commit: 19e34f6c69d36993916b7163d38b88c022005b30
Parents: 10ee1a5
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Fri Sep 18 17:51:15 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Sep 18 17:51:15 2015 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 44 ++++++--------------
 1 file changed, 12 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19e34f6c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 9d0b3c7..d3af48c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -64,7 +64,6 @@ import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -1865,9 +1864,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             while (msgs.size() > MAX) {
                 TcpDiscoveryAbstractMessage polled = msgs.poll();
 
-                if (polled instanceof DiscoveryCustomMessage)
-                    U.debug("### Discarded custom message ###: " + msg);
-
                 assert polled != null;
 
                 if (polled.id().equals(discardId))
@@ -2315,10 +2311,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     log.debug("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
 
-                                U.debug(
-                                    "### Pending messages will be sent [failure=" + failure +
-                                        ", forceSndPending=" + forceSndPending + ']');
-
                                 if (debugMode)
                                     debugLog("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
@@ -2330,14 +2322,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                                         if (pendingMsg.id().equals(pendingMsgs.discardId))
                                             skip = false;
 
-                                        if (!(msg instanceof DiscoveryCustomMessage))
+                                        if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
                                             continue;
-                                        else
-                                            U.debug(log, "Avoid skipping custom message: " + pendingMsg);
                                     }
 
-                                    U.debug(log, "Sending pending: " + pendingMsg);
-
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
@@ -2363,11 +2351,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
-                                    if (msg instanceof TcpDiscoveryCustomEventMessage)
-                                        U.debug(log, "Pending message has been sent to next node [msgId=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
-                                            ", res=" + res + ']');
-
                                     if (debugMode)
                                         debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
@@ -3263,6 +3246,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
 
+                            pendingMsgs.msgs.clear();
+                            pendingMsgs.msgs.addAll(msg.messages());
                             pendingMsgs.discard(msg.discardedMessageId());
 
                             // Clear data to minimize message size.
@@ -4197,10 +4182,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
             else {
-                if (msg.verified()) {
-                    assert joiningNodes.isEmpty();
+                TcpDiscoverySpiState state0;
 
-                    U.debug(log, "Processing custom message: " + msg);
+                synchronized (mux) {
+                    state0 = spiState;
+                }
+
+                assert !(msg.verified() && state0 == CONNECTING);
+
+                if (msg.verified() && state0 == CONNECTED) {
+                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
 
                     notifyDiscoveryListener(msg);
                 }
@@ -4210,21 +4201,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
         }
 
-        long lastCheck = U.currentTimeMillis();
-
         /**
          * Checks and flushes custom event messages if no nodes are attempting to join the grid.
          */
         private void checkPendingCustomMessages() {
-            if (lastCheck + 2000 < U.currentTimeMillis()) {
-                U.debug(
-                    log,
-                    "Custom messages [msgs=" + pendingCustomMsgs.size() + ", locNodeId=" + locNode.id() +
-                        ", locNodeOrder=" + locNode.order() + ", joining=" + joiningNodes + ']');
-
-                lastCheck = U.currentTimeMillis();
-            }
-
             if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) {
                 TcpDiscoveryCustomEventMessage msg;
 


[22/23] ignite git commit: Merge branch 'ignite-1.4' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171

Posted by ag...@apache.org.
Merge branch 'ignite-1.4' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70b26fec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70b26fec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70b26fec

Branch: refs/heads/ignite-1171
Commit: 70b26fec9439594762bead8ab45ead05d946be1e
Parents: 46a22e3 1942d75
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 22 18:30:17 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 22 18:30:17 2015 -0700

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java |  17 +--
 .../configuration/CacheConfiguration.java       |  15 +++
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheMapEntry.java     |  51 ++++----
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../cache/GridCacheSwapEntryImpl.java           |  31 ++++-
 .../processors/cache/GridCacheSwapManager.java  |  80 ++++++++-----
 .../datastreamer/DataStreamerImpl.java          |   2 -
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  10 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |  65 +++++++++-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  33 ++++++
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  19 +--
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  54 ++++++---
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  11 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   5 +
 .../processors/query/h2/opt/GridH2Table.java    |  10 +-
 .../cache/CacheIndexStreamerTest.java           |  37 ++++--
 .../processors/cache/GridCacheSwapSelfTest.java |   4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 21 files changed, 453 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/70b26fec/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/70b26fec/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------


[18/23] ignite git commit: 1171-debug - Fix WIP.

Posted by ag...@apache.org.
1171-debug - Fix WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/47f9605f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/47f9605f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/47f9605f

Branch: refs/heads/ignite-1171
Commit: 47f9605faee1972750efed4c4db7a9b36bb8f3b5
Parents: 271b750
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 22 15:23:20 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 22 15:23:20 2015 -0700

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |   3 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 168 ++++++++++++-------
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  14 --
 3 files changed, 113 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e5090cb..e730edc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1022,6 +1022,9 @@ public abstract class IgniteUtils {
      */
     @Deprecated
     public static void debug(IgniteLogger log, String msg) {
+        if (true)
+            return;
+
         log.info(msg);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3d624d8..06a6bb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1377,7 +1377,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         UUID destNodeId,
         @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
         @Nullable IgniteUuid discardMsgId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
         @Nullable IgniteUuid discardCustomMsgId
         ) {
         assert destNodeId != null;
@@ -1403,7 +1402,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId);
+                nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1426,7 +1425,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             nodeAddedMsg.topology(null);
             nodeAddedMsg.topologyHistory(null);
-            nodeAddedMsg.messages(null, null, null, null);
+            nodeAddedMsg.messages(null, null, null);
         }
     }
 
@@ -1835,7 +1834,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                prepareNodeAddedMessage(msg, destNodeId, null, null, null, null);
+                prepareNodeAddedMessage(msg, destNodeId, null, null, null);
 
             return msg;
         }
@@ -1851,9 +1850,6 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** Pending messages. */
         private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
 
-        /** Pending messages. */
-        private final Queue<TcpDiscoveryAbstractMessage> customMsgs = new ArrayDeque<>(MAX * 2);
-
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
 
@@ -1870,12 +1866,10 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            Queue<TcpDiscoveryAbstractMessage> msgs0 = msg instanceof TcpDiscoveryCustomEventMessage ? customMsgs : msgs;
+            msgs.add(msg);
 
-            msgs0.add(msg);
-
-            while (msgs0.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs0.poll();
+            while (msgs.size() > MAX) {
+                TcpDiscoveryAbstractMessage polled = msgs.poll();
 
                 assert polled != null;
 
@@ -1890,25 +1884,18 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msgs Message.
          * @param discardId Discarded message ID.
          */
-        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId,
-            @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs, @Nullable IgniteUuid duscardCustomId) {
+        void reset(
+            @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+            @Nullable IgniteUuid discardId,
+            @Nullable IgniteUuid customDiscardId
+        ) {
             this.msgs.clear();
-            this.customMsgs.clear();
 
-            if (msgs != null) {
-                // Backward compatibility: old nodes send messages in one collection.
-                for (TcpDiscoveryAbstractMessage msg : msgs) {
-                    if (msg instanceof TcpDiscoveryCustomEventMessage)
-                        this.customMsgs.add(msg);
-                    else
-                        this.msgs.add(msg);
-                }
-            }
-
-            if (customMsgs != null)
-                this.customMsgs.addAll(customMsgs);
+            if (msgs != null)
+                this.msgs.addAll(msgs);
 
             this.discardId = discardId;
+            this.customDiscardId = customDiscardId;
         }
 
         /**
@@ -1929,31 +1916,86 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @return Non-discarded messages iterator.
          */
         public Iterator<TcpDiscoveryAbstractMessage> iterator() {
-            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+            return new SkipIterator();
+        }
 
-            if (discardId != null) {
-                while (msgIt.hasNext()) {
-                    TcpDiscoveryAbstractMessage msg = msgIt.next();
+        /**
+         *
+         */
+        private class SkipIterator implements Iterator<TcpDiscoveryAbstractMessage> {
+            /** Skip non-custom messages flag. */
+            private boolean skipMsg = discardId != null;
 
-                    // Skip all messages before discarded, inclusive.
-                    if (discardId.equals(msg.id()))
-                        break;
-                }
+            /** Skip custom messages flag. */
+            private boolean skipCustomMsg;
+
+            /** Internal iterator. */
+            private Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+            /** Next message. */
+            private TcpDiscoveryAbstractMessage next;
+
+            {
+                advance();
             }
 
-            Iterator<TcpDiscoveryAbstractMessage> customMsgIt = customMsgs.iterator();
+            /** {@inheritDoc} */
+            @Override public boolean hasNext() {
+                return next != null;
+            }
 
-            if (customDiscardId != null) {
-                while (customMsgIt.hasNext()) {
-                    TcpDiscoveryAbstractMessage msg = customMsgIt.next();
+            /** {@inheritDoc} */
+            @Override public TcpDiscoveryAbstractMessage next() {
+                if (next == null)
+                    throw new NoSuchElementException();
 
-                    // Skip all messages before discarded, inclusive.
-                    if (customDiscardId.equals(msg.id()))
-                        break;
-                }
+                TcpDiscoveryAbstractMessage next0 = next;
+
+                advance();
+
+                return next0;
             }
 
-            return F.concat(msgIt, customMsgIt);
+            /** {@inheritDoc} */
+            @Override public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+            /**
+             * Advances iterator to the next available item.
+             */
+            private void advance() {
+                next = null;
+
+                while (msgIt.hasNext()) {
+                    TcpDiscoveryAbstractMessage msg0 = msgIt.next();
+
+                    if (msg0 instanceof TcpDiscoveryCustomEventMessage) {
+                        if (skipCustomMsg) {
+                            assert customDiscardId != null;
+
+                            if (F.eq(customDiscardId, msg0.id()))
+                                skipCustomMsg = false;
+
+                            continue;
+                        }
+                    }
+                    else {
+                        if (skipMsg) {
+                            assert discardId != null;
+
+                            if (F.eq(discardId, msg0.id()))
+                                skipMsg = false;
+
+                            continue;
+                        }
+                    }
+
+                    next = msg0;
+
+                    break;
+                }
+            }
         }
     }
 
@@ -2044,9 +2086,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
-            U.debug(
-                log,
-                "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+            if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
+                U.debug(
+                    log,
+                    "Processing message [locNodeId=" + locNode.id() + ", cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
             if (debugMode)
                 debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
@@ -2399,7 +2442,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
-                                        pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+                                        pendingMsgs.discardId, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
                                         timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2447,7 +2490,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                             else
                                 prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
-                                    pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+                                    pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -2467,6 +2510,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     log.debug("Message has been sent to next node [msg=" + msg +
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
+                                U.debug(log, "Message has been sent to next node [msg=" + msg +
+                                    ", next=" + next.id() +
+                                    ", res=" + res + ']');
 
                                 if (debugMode)
                                     debugLog("Message has been sent to next node [msg=" + msg +
@@ -2588,7 +2634,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                         prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
-                            pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
+                            pendingMsgs.customDiscardId);
 
                         msgWorker.addMessage(pendingMsg);
 
@@ -3314,10 +3360,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.putAll(msg.topologyHistory());
 
                             pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
-                                msg.customMessages(), msg.discardedCustomMessageId());
+                                msg.discardedCustomMessageId());
 
                             // Clear data to minimize message size.
-                            msg.messages(null, null, null, null);
+                            msg.messages(null, null, null);
                             msg.topology(null);
                             msg.topologyHistory(null);
                             msg.clearDiscoveryData();
@@ -3423,7 +3469,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
 
-            if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
+            TcpDiscoverySpiState state = spiStateCopy();
+
+            if (msg.verified() && !locNodeId.equals(nodeId) && state != CONNECTING && fireEvt) {
                 spi.stats.onNodeJoined();
 
                 // Make sure that node with greater order will never get EVT_NODE_JOINED
@@ -3438,7 +3486,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     boolean b = ring.topologyVersion(topVer);
 
                     assert b : "Topology version has not been updated: [ring=" + ring + ", msg=" + msg +
-                        ", lastMsg=" + lastMsg + ", spiState=" + spiStateCopy() + ']';
+                        ", lastMsg=" + lastMsg + ", spiState=" + state + ']';
 
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
@@ -3450,7 +3498,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     lastMsg = msg;
                 }
 
-                notifyDiscovery(EVT_NODE_JOINED, topVer, node);
+                if (state == CONNECTED)
+                    notifyDiscovery(EVT_NODE_JOINED, topVer, node);
 
                 try {
                     if (spi.ipFinder.isShared() && locNodeCoord)
@@ -3466,7 +3515,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) {
+            if (msg.verified() && locNodeId.equals(nodeId) && state == CONNECTING) {
                 assert node != null;
 
                 assert topVer > 0 : "Invalid topology version: " + msg;
@@ -4204,7 +4253,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
-            U.debug(log, "Processing custom message: " + msg);
+            U.debug(log, "Processing custom message [msg=" + msg + ", topVer=" + ring.topologyVersion() + ']');
 
             if (isLocalNodeCoordinator()) {
                 if (!joiningNodes.isEmpty()) {
@@ -5229,7 +5278,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
-                        prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null);
+                        prepareNodeAddedMessage(msg, clientNodeId, null, null, null);
 
                         writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());
@@ -5398,6 +5447,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (log.isDebugEnabled())
                 log.debug("Message has been added to queue: " + msg);
+
+            if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
+                U.debug(log, "Message has been added to queue: " + msg);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/47f9605f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 789f2b9..5a7146d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,9 +48,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     /** Discarded message ID. */
     private IgniteUuid discardMsgId;
 
-    /** Pending messages from previous node. */
-    private Collection<TcpDiscoveryAbstractMessage> customMsgs;
-
     /** Discarded message ID. */
     private IgniteUuid discardCustomMsgId;
 
@@ -123,15 +120,6 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
-     * Gets pending cusotm messages sent to new node by its previous.
-     *
-     * @return Pending messages from previous node.
-     */
-    @Nullable public Collection<TcpDiscoveryAbstractMessage> customMessages() {
-        return customMsgs;
-    }
-
-    /**
      * Gets discarded custom message ID.
      *
      * @return Discarded message ID.
@@ -149,12 +137,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     public void messages(
         @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
         @Nullable IgniteUuid discardMsgId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
         @Nullable IgniteUuid discardCustomMsgId
     ) {
         this.msgs = msgs;
         this.discardMsgId = discardMsgId;
-        this.customMsgs = customMsgs;
         this.discardCustomMsgId = discardCustomMsgId;
     }
 


[11/23] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4

Posted by ag...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1040872
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1040872
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1040872

Branch: refs/heads/ignite-1171
Commit: a1040872f37cf4fd1dc20584c68307f420d0d3af
Parents: 33fe30d 50f75bd
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:59:14 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:59:14 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java    | 17 +++++------------
 .../processors/cache/GridCacheProcessor.java       |  2 +-
 2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[20/23] ignite git commit: 1171-debug - Fix WIP.

Posted by ag...@apache.org.
1171-debug - Fix WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29cd3dbc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29cd3dbc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29cd3dbc

Branch: refs/heads/ignite-1171
Commit: 29cd3dbc510a14b001722de212312c915085dc4a
Parents: cb758c1
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 22 17:57:09 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 22 17:57:09 2015 -0700

----------------------------------------------------------------------
 .../ignite/internal/processors/cache/GridCacheProcessor.java    | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/29cd3dbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 3b9e9ed..74124bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1573,6 +1573,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 req.deploymentId(),
                 topVer
             );
+
+            DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+
+            if (desc != null)
+                desc.onStart();
         }
 
         // Start statically configured caches received from remote nodes during exchange.


[05/23] ignite git commit: ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing

Posted by ag...@apache.org.
ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621eb0f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621eb0f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621eb0f7

Branch: refs/heads/ignite-1171
Commit: 621eb0f75bbe1a0a623229dded38a3549309eead
Parents: 8b94494
Author: sboikov <se...@inria.fr>
Authored: Mon Sep 21 21:37:52 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon Sep 21 21:37:52 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 37 +++++++++++++-------
 .../processors/cache/GridCacheProcessor.java    |  2 +-
 .../processors/cache/GridCacheSwapManager.java  | 24 ++++++-------
 .../datastreamer/DataStreamerImpl.java          |  2 --
 4 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f2bb646..961c792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1588,6 +1588,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 boolean hasValPtr = hasOffHeapPointer();
 
+                if (old == null)
+                    old = saveValueForIndexUnlocked();
+
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
                 clearIndex(old);
@@ -2163,6 +2166,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     // Must persist inside synchronization in non-tx mode.
                     cctx.store().remove(null, keyValue(false));
 
+                if (oldVal == null)
+                    oldVal = saveValueForIndexUnlocked();
+
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
                 clearIndex(oldVal);
@@ -3342,7 +3348,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         try {
             synchronized (this) {
-                CacheObject expiredVal = saveValueForIndexUnlocked();
+                CacheObject expiredVal = saveOldValueUnlocked(false);
 
                 boolean hasOldBytes = hasOffHeapPointer();
 
@@ -3523,12 +3529,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            if (qryMgr != null && qryMgr.enabled()) {
-                qryMgr.store(key,
-                    val,
-                    ver,
-                    expireTime);
-            }
+            if (qryMgr.enabled())
+                qryMgr.store(key, val, ver, expireTime);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -3547,8 +3549,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
 
-            if (qryMgr != null)
-                qryMgr.remove(key(), prevVal == null ? null : prevVal);
+            if (qryMgr.enabled())
+                qryMgr.remove(key(), prevVal);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -3562,10 +3564,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @return Previous value or {@code null}.
      * @throws IgniteCheckedException If failed to retrieve previous value.
      */
-    protected CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+    protected final CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+        return saveOldValueUnlocked(true);
+    }
+
+    /**
+     * @param qryOnly If {@code true} reads old value only if query indexing is enabled.
+     * @return Previous value or {@code null}.
+     * @throws IgniteCheckedException If failed to retrieve previous value.
+     */
+    private CacheObject saveOldValueUnlocked(boolean qryOnly) throws IgniteCheckedException {
         assert Thread.holdsLock(this);
 
-        if (cctx.queries() == null)
+        if (qryOnly && !cctx.queries().enabled())
             return null;
 
         CacheObject val = rawGetOrUnmarshalUnlocked(false);
@@ -3681,7 +3692,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     if (obsoleteVersionExtras() != null)
                         return true;
 
-                    CacheObject prev = saveValueForIndexUnlocked();
+                    CacheObject prev = saveOldValueUnlocked(false);
 
                     if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
                         if (swap) {
@@ -3791,7 +3802,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 GridCacheQueryManager qryMgr = cctx.queries();
 
-                if (qryMgr != null)
+                if (qryMgr.enabled())
                     qryMgr.onUnswap(key, prevVal);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c92de7d..7c16136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2759,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (spaceName.equals(CU.swapSpaceName(cctx))) {
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            if (qryMgr != null) {
+            if (qryMgr.enabled()) {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 9b6381e..d9a8b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -696,12 +696,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+        if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
             return null; // Not found.
 
         swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
-                if (qryMgr == null && cctx.config().isStatisticsEnabled())
+                if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
                     cctx.cache().metrics0().onSwapRead(rmv != null);
 
                 if (rmv != null) {
@@ -843,7 +843,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridCacheSwapEntry entry;
 
-        if (qryMgr != null) {
+        if (qryMgr.enabled()) {
             entry = readOffheapBeforeRemove(key, keyBytes, part);
 
             if (entry != null) {
@@ -952,7 +952,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         ClassLoader ldr = cctx.deploy().globalLoader();
 
-        if (qryMgr != null) { // Unswap for indexing.
+        if (qryMgr.enabled()) { // Unswap for indexing.
             Iterator<SwapKey> iter = unprocessedKeys.iterator();
 
             while (iter.hasNext()) {
@@ -967,7 +967,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             unprocessedKeys,
             new IgniteBiInClosure<SwapKey, byte[]>() {
                 @Override public void apply(SwapKey swapKey, byte[] rmv) {
-                    if (qryMgr == null && cctx.config().isStatisticsEnabled())
+                    if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
                         cctx.cache().metrics0().onSwapRead(rmv != null);
 
                     if (rmv != null) {
@@ -1124,7 +1124,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      */
     public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
         throws IgniteCheckedException {
-        assert cctx.queries() != null;
+        assert cctx.queries().enabled();
 
         byte[] entryBytes = offheap.get(spaceName, part, key, keyBytes);
 
@@ -1155,7 +1155,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      */
     private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr)
         throws IgniteCheckedException {
-        assert cctx.queries() != null;
+        assert cctx.queries().enabled();
 
         byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr);
 
@@ -1196,7 +1196,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
 
-            if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+            if ((!qryMgr.enabled() || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
                 offheap.removex(spaceName, part, key, keyBytes)) {
                 if (cctx.config().isStatisticsEnabled())
                     cctx.cache().metrics0().onOffHeapRemove();
@@ -1212,7 +1212,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
             ClassLoader ldr = cctx.deploy().globalLoader();
 
-            if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+            if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
                 return; // Not found.
 
             swapMgr.remove(spaceName,
@@ -1279,7 +1279,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (qryMgr != null)
+        if (qryMgr.enabled())
             qryMgr.onSwap(key);
     }
 
@@ -1308,7 +1308,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
 
-                if (qryMgr != null)
+                if (qryMgr.enabled())
                     qryMgr.onSwap(swapEntry.key());
             }
         }
@@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.events().addEvent(batchSwapEntry.partition(), batchSwapEntry.key(), cctx.nodeId(),
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null);
 
-                    if (qryMgr != null)
+                    if (qryMgr.enabled())
                         qryMgr.onSwap(batchSwapEntry.key());
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index b5d9a7d..ab2a6e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1569,8 +1569,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                     GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 
-                    entry.unswap(false);
-
                     if (plc != null) {
                         ttl = CU.toTtl(plc.getExpiryForCreation());
 


[23/23] ignite git commit: IGNITE-1171 - Merged clean code from debug branch.

Posted by ag...@apache.org.
IGNITE-1171 - Merged clean code from debug branch.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f5dcaf35
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f5dcaf35
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f5dcaf35

Branch: refs/heads/ignite-1171
Commit: f5dcaf35eebe12aab5277502c9e1f937f72d618f
Parents: 70b26fe 1edd63d
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 22 18:36:04 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 22 18:36:04 2015 -0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheProcessor.java    |   5 +
 .../continuous/CacheContinuousQueryManager.java |   9 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 203 ++++++++++++++-----
 .../tcp/internal/TcpDiscoveryNodesRing.java     |  94 +++------
 .../messages/TcpDiscoveryDiscardMessage.java    |  15 +-
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  19 +-
 ...GridCacheValueConsistencyAtomicSelfTest.java |   2 +-
 7 files changed, 230 insertions(+), 117 deletions(-)
----------------------------------------------------------------------



[03/23] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4

Posted by ag...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8b94494a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8b94494a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8b94494a

Branch: refs/heads/ignite-1171
Commit: 8b94494a4817780961c203e0b2b52715baa3b6da
Parents: 84a2300 7ccd0b3
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Sep 18 23:16:10 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Sep 18 23:16:10 2015 +0300

----------------------------------------------------------------------
 .../src/main/dotnet/Apache.Ignite.Core/Impl/IgniteManager.cs       | 2 --
 modules/yardstick/config/ignite-base-config.xml                    | 2 +-
 2 files changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------



[15/23] ignite git commit: Check for WeakValue in GridH2AbstractKeyValueRow.onUnswap

Posted by ag...@apache.org.
Check for WeakValue in GridH2AbstractKeyValueRow.onUnswap


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca2bce00
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca2bce00
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca2bce00

Branch: refs/heads/ignite-1171
Commit: ca2bce00516142a1204fb9226c938174047e72d6
Parents: 72c3eef
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 15:04:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 15:04:27 2015 +0300

----------------------------------------------------------------------
 .../processors/query/h2/opt/GridH2AbstractKeyValueRow.java       | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ca2bce00/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index c11f541..ca5442a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,7 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
      * @throws IgniteCheckedException If failed.
      */
     public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
-        if (peekValue(VAL_COL) != null)
+        Value val0 = peekValue(VAL_COL);
+
+        if (val0 != null && !(val0 instanceof WeakValue))
             return;
 
         setValue(VAL_COL, desc.wrap(val, desc.valueType()));


[16/23] ignite git commit: Added test.

Posted by ag...@apache.org.
Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1942d758
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1942d758
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1942d758

Branch: refs/heads/ignite-1171
Commit: 1942d75856ab6d317b743de71b53a29abf81316a
Parents: ca2bce0
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 17:36:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 17:36:18 2015 +0300

----------------------------------------------------------------------
 .../IgniteCachePutRetryAbstractSelfTest.java    | 33 ++++++++++++++++++++
 1 file changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1942d758/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 943caeb..76f12c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -168,6 +168,13 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
     /**
      * @throws Exception If failed.
      */
+    public void testGetAndPut() throws Exception {
+        checkRetry(Test.GET_AND_PUT, TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testPutStoreEnabled() throws Exception {
         checkRetry(Test.PUT, TestMemoryMode.HEAP, true);
     }
@@ -275,6 +282,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
                     break;
                 }
 
+                case GET_AND_PUT: {
+                    for (int i = 0; i < keysCnt; i++)
+                        cache.put(i, 0);
+
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer expOld = iter;
+
+                        Integer val = ++iter;
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            Integer old = cache.getAndPut(i, val);
+
+                            assertTrue("Unexpected old value [old=" + old + ", exp=" + expOld + ']',
+                                expOld.equals(old) || val.equals(old));
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
+
                 case PUT_ALL: {
                     while (System.currentTimeMillis() < stopTime) {
                         Integer val = ++iter;
@@ -495,6 +525,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
         PUT,
 
         /** */
+        GET_AND_PUT,
+
+        /** */
         PUT_ALL,
 
         /** */


[02/23] ignite git commit: minor (fixed warning messages)

Posted by ag...@apache.org.
minor (fixed warning messages)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84a23008
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84a23008
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84a23008

Branch: refs/heads/ignite-1171
Commit: 84a230084e9488a4ddb9b52f86ecd6c623baf745
Parents: d64fc9d
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Sep 18 23:15:59 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Sep 18 23:15:59 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/spi/discovery/tcp/ServerImpl.java | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/84a23008/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3e50b94..4ce46e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4437,9 +4437,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         LT.warn(log, null, "Unknown connection detected (is some other software connecting to " +
                             "this Ignite port?" +
-                            (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) +
-                            ") [rmtAddr=" + sock.getRemoteSocketAddress() +
-                            ", locAddr=" + sock.getLocalSocketAddress() + ']');
+                            (!spi.isSslEnabled() ? " missing SSL configuration on remote node?" : "" ) +
+                            ") [rmtAddr=" + sock.getInetAddress() + ']', true);
 
                         return;
                     }
@@ -4555,8 +4554,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                         U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
 
                     if (X.hasCause(e, SSLException.class) && spi.isSslEnabled() && !spi.isNodeStopping0())
-                        LT.warn(log, null, "Failed to initialize connection. Not encrypted data received. " +
-                            "Missed SSL configuration on node? [sock=" + sock + ']');
+                        LT.warn(log, null, "Failed to initialize connection " +
+                            "(missing SSL configuration on remote node?) " +
+                            "[rmtAddr=" + sock.getInetAddress() + ']', true);
                     else if ((X.hasCause(e, ObjectStreamException.class) || !sock.isClosed())
                         && !spi.isNodeStopping0()) {
                         if (U.isMacInvalidArgumentError(e))
@@ -5293,4 +5293,4 @@ class ServerImpl extends TcpDiscoveryImpl {
             this.sock = sock;
         }
     }
-}
\ No newline at end of file
+}


[14/23] ignite git commit: ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue

Posted by ag...@apache.org.
ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72c3eef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72c3eef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72c3eef2

Branch: refs/heads/ignite-1171
Commit: 72c3eef2aa31df4a68b46a8877809cc0f49c1368
Parents: 39dace4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 13:51:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 13:51:09 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  8 +--
 .../processors/cache/GridCacheMapEntry.java     | 14 ++---
 .../processors/cache/GridCacheProcessor.java    |  6 +--
 .../cache/GridCacheSwapEntryImpl.java           | 31 +++++++++--
 .../processors/cache/GridCacheSwapManager.java  | 56 +++++++++++++-------
 .../processors/query/h2/IgniteH2Indexing.java   | 19 ++++---
 .../query/h2/opt/GridH2AbstractKeyValueRow.java | 49 ++++++++++-------
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  | 11 +++-
 .../query/h2/opt/GridH2RowDescriptor.java       |  5 ++
 .../cache/CacheIndexStreamerTest.java           | 33 +++++++++---
 .../processors/cache/GridCacheSwapSelfTest.java |  4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |  2 +
 12 files changed, 158 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..ae987b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -805,9 +805,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 if (modes.offheap || modes.swap) {
                     GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
-                    GridCacheSwapEntry swapEntry = swapMgr.read(cacheKey, modes.offheap, modes.swap);
-
-                    cacheVal = swapEntry != null ? swapEntry.value() : null;
+                    cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
                 }
             }
             else
@@ -856,9 +854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (offheap || swap) {
             GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
-            GridCacheSwapEntry swapEntry = swapMgr.read(key, offheap, swap);
-
-            return swapEntry != null ? swapEntry.value() : null;
+            return swapMgr.readValue(key, offheap, swap);
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 961c792..4bf0aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -512,7 +512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
                 else
-                    e = detached() ? cctx.swap().read(this, true, true, true) : cctx.swap().readAndRemove(this);
+                    e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);
 
                 if (log.isDebugEnabled())
                     log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']');
@@ -2840,7 +2840,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (offheap || swap) {
-                GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap);
+                GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
 
                 return e != null ? e.value() : null;
             }
@@ -3581,14 +3581,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         CacheObject val = rawGetOrUnmarshalUnlocked(false);
 
-        if (val == null) {
-            GridCacheSwapEntry swapEntry = cctx.swap().read(key, true, true);
-
-            if (swapEntry == null)
-                return null;
-
-            return swapEntry.value();
-        }
+        if (val == null)
+            val = cctx.swap().readValue(key, true, true);
 
         return val;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9c325aa..e92ea57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2763,14 +2763,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 
-                    GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes);
+                    GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
 
                     CacheObject val = swapEntry.value();
 
-                    if (val == null)
-                        val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), swapEntry.type(),
-                            swapEntry.valueBytes());
-
                     assert val != null;
 
                     qryMgr.remove(key, val);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index b7c66d3..6b1266f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -94,8 +94,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
         long expireTime,
         @Nullable IgniteUuid keyClsLdrId,
         @Nullable IgniteUuid valClsLdrId) {
-        assert ver != null;
-
         this.valBytes = valBytes;
         this.type = type;
         this.ver = ver;
@@ -268,9 +266,36 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
 
     /**
      * @param arr Entry bytes.
+     * @param valOnly If {@code true} unmarshalls only entry value.
      * @return Entry.
      */
-    public static GridCacheSwapEntryImpl unmarshal(byte[] arr) {
+    public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
+        if (valOnly) {
+            long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+
+            boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+
+            off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
+
+            int arrLen = UNSAFE.getInt(arr, off);
+
+            off += 4;
+
+            byte type = UNSAFE.getByte(arr, off++);
+
+            byte[] valBytes = new byte[arrLen];
+
+            UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+
+            return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
+                type,
+                null,
+                0L,
+                0L,
+                null,
+                null);
+        }
+
         long off = BYTE_ARR_OFF;
 
         long ttl = UNSAFE.getLong(arr, off);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d9a8b5c..2ab7b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -569,6 +569,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param entryLocked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
      * @param readSwap Read swap flag.
+     * @param valOnly If {@code true} unmarshals only entry value.
      * @return Value from swap or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
@@ -578,7 +579,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         int part,
         boolean entryLocked,
         boolean readOffheap,
-        boolean readSwap)
+        boolean readSwap,
+        boolean valOnly)
         throws IgniteCheckedException
     {
         assert readOffheap || readSwap;
@@ -605,7 +607,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.cache().metrics0().onOffHeapRead(bytes != null);
 
                 if (bytes != null)
-                    return swapEntry(unmarshalSwapEntry(bytes));
+                    return swapEntry(unmarshalSwapEntry(bytes, valOnly));
             }
 
             if (!swapEnabled || !readSwap)
@@ -620,7 +622,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             if (bytes == null && lsnr != null)
                 return lsnr.entry;
 
-            return bytes != null ? swapEntry(unmarshalSwapEntry(bytes)) : null;
+            return bytes != null ? swapEntry(unmarshalSwapEntry(bytes, valOnly)) : null;
         }
         finally {
             if (lsnr != null)
@@ -706,7 +708,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 if (rmv != null) {
                     try {
-                        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+                        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
 
                         if (entry == null)
                             return;
@@ -756,20 +758,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param locked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
      * @param readSwap Read swap flag.
+     * @param valOnly If {@code true} unmarshals only entry value.
      * @return Read value.
      * @throws IgniteCheckedException If read failed.
      */
     @Nullable GridCacheSwapEntry read(GridCacheEntryEx entry,
         boolean locked,
         boolean readOffheap,
-        boolean readSwap)
+        boolean readSwap,
+        boolean valOnly)
         throws IgniteCheckedException
     {
         if (!offheapEnabled && !swapEnabled)
             return null;
 
         return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
-            readOffheap, readSwap);
+            readOffheap, readSwap, valOnly);
     }
 
     /**
@@ -805,7 +809,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Read value.
      * @throws IgniteCheckedException If read failed.
      */
-    @Nullable public GridCacheSwapEntry read(KeyCacheObject key,
+    @Nullable public CacheObject readValue(KeyCacheObject key,
         boolean readOffheap,
         boolean readSwap)
         throws IgniteCheckedException
@@ -815,7 +819,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap);
+        GridCacheSwapEntry swapEntry = read(key,
+            key.valueBytes(cctx.cacheObjectContext()),
+            part,
+            false,
+            readOffheap,
+            readSwap,
+            true);
+
+        assert swapEntry == null || swapEntry.value() != null : swapEntry;
+
+        return swapEntry != null ? swapEntry.value() : null;
     }
 
     /**
@@ -865,7 +879,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.cache().metrics0().onOffHeapRemove();
             }
 
-            entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes));
+            entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes, false));
         }
 
         return entry;
@@ -972,7 +986,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     if (rmv != null) {
                         try {
-                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
 
                             if (entry == null)
                                 return;
@@ -1078,7 +1092,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
 
             if (lsnrs != null) {
-                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
 
                 for (GridCacheSwapListener lsnr : lsnrs)
                     lsnr.onEntryUnswapped(part, key, e);
@@ -1132,7 +1146,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
 
         if (entryBytes != null) {
-            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, false));
 
             if (entry != null) {
                 cctx.queries().onUnswap(key, entry.value());
@@ -1165,7 +1179,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (entryBytes == null)
             return false;
 
-        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, true));
 
         if (entry == null)
             return false;
@@ -2063,7 +2077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 try {
                     for (Map.Entry<byte[], byte[]> e : iter) {
                         try {
-                            GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue());
+                            GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue(), false);
 
                             IgniteUuid valLdrId = swapEntry.valueClassLoaderId();
 
@@ -2120,10 +2134,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param bytes Bytes to unmarshal.
+     * @param valOnly If {@code true} unmarshalls only value.
      * @return Unmarshalled entry.
      */
-    private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes) {
-        return GridCacheSwapEntryImpl.unmarshal(bytes);
+    private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes, boolean valOnly) {
+        return GridCacheSwapEntryImpl.unmarshal(bytes, valOnly);
     }
 
     /**
@@ -2169,7 +2184,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
             Map.Entry<byte[], byte[]> e = iter.nextX();
 
-            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue());
+            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
 
             return F.t(e.getKey(), swapEntry(unmarshalled));
         }
@@ -2446,6 +2461,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
     }
 
+    /**
+     *
+     */
     private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware {
         /** */
         private Map.Entry<byte[], byte[]> entry;
@@ -2474,7 +2492,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override public V getValue() {
             try {
-                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
 
                 swapEntry(e);
 
@@ -2487,7 +2505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         /** {@inheritDoc} */
         @Override public GridCacheVersion version() {
-            GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+            GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
 
             return e.version();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2af1386..8595187 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
@@ -2108,6 +2107,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         /** */
         private final GridUnsafeGuard guard;
 
+        /** */
+        private final boolean preferSwapVal;
+
         /**
          * @param type Type descriptor.
          * @param schema Schema.
@@ -2136,6 +2138,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             keyType = DataType.getTypeFromClass(type.keyClass());
             valType = DataType.getTypeFromClass(type.valueClass());
+
+            preferSwapVal = schema.ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED;
         }
 
         /** {@inheritDoc} */
@@ -2263,15 +2267,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (cctx.isNear())
                 cctx = cctx.near().dht().context();
 
-            GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
+            CacheObject v = cctx.swap().readValue(cctx.toCacheKeyObject(key), true, true);
 
-            if (e == null)
+            if (v == null)
                 return null;
 
-            CacheObject v = e.value();
-
-            assert v != null : "swap must unmarshall it for us";
-
             return v.value(cctx.cacheObjectContext(), false);
         }
 
@@ -2312,5 +2312,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             return new GridH2KeyValueRowOffheap(this, ptr);
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean preferSwapValue() {
+            return preferSwapVal;
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a16284..c11f541 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -130,20 +130,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
     /**
      * Atomically updates weak value.
      *
-     * @param upd New value.
-     * @return {@code null} If update succeeded, unexpected value otherwise.
+     * @param valObj New value.
+     * @return New value if old value is empty, old value otherwise.
+     * @throws IgniteCheckedException If failed.
      */
-    protected synchronized Value updateWeakValue(Value upd) {
+    protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
         Value res = peekValue(VAL_COL);
 
         if (res != null && !(res instanceof WeakValue))
             return res;
 
+        Value upd = desc.wrap(valObj, desc.valueType());
+
         setValue(VAL_COL, new WeakValue(upd));
 
         notifyAll();
 
-        return null;
+        return upd;
     }
 
     /**
@@ -188,21 +191,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
             Value v;
 
             if (col == VAL_COL) {
-                v = syncValue(0);
+                v = peekValue(VAL_COL);
 
                 long start = 0;
                 int attempt = 0;
 
                 while ((v = WeakValue.unwrap(v)) == null) {
-                    v = getOffheapValue(VAL_COL);
+                    if (!desc.preferSwapValue()) {
+                        v = getOffheapValue(VAL_COL);
 
-                    if (v != null) {
-                        setValue(VAL_COL, v);
+                        if (v != null) {
+                            setValue(VAL_COL, v);
 
-                        if (peekValue(KEY_COL) == null)
-                            cache();
+                            if (peekValue(KEY_COL) == null)
+                                cache();
 
-                        return v;
+                            return v;
+                        }
                     }
 
                     Object k = getValue(KEY_COL).getObject();
@@ -213,16 +218,24 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
                         if (valObj != null) {
                             // Even if we've found valObj in swap, it is may be some new value,
                             // while the needed value was already unswapped, so we have to recheck it.
-                            if ((v = WeakValue.unwrap(syncValue(0))) == null && (v = getOffheapValue(VAL_COL)) == null) {
-                                Value upd = desc.wrap(valObj, desc.valueType());
-
-                                v = updateWeakValue(upd);
-
-                                return v == null ? upd : v;
-                            }
+                            if ((v = getOffheapValue(VAL_COL)) == null)
+                                return updateWeakValue(valObj);
                         }
                         else {
                             // If nothing found in swap then we should be already unswapped.
+                            if (desc.preferSwapValue()) {
+                                v = getOffheapValue(VAL_COL);
+
+                                if (v != null) {
+                                    setValue(VAL_COL, v);
+
+                                    if (peekValue(KEY_COL) == null)
+                                        cache();
+
+                                    return v;
+                                }
+                            }
+
                             v = syncValue(attempt);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index de31fe1..2dd9f25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -216,12 +216,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
 
     /** {@inheritDoc} */
     @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
-    @Override protected synchronized Value updateWeakValue(Value upd) {
+    @Override protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
+        Value val = peekValue(VAL_COL);
+
+        if (val != null)
+            return val;
+
+        Value upd = desc.wrap(valObj, desc.valueType());
+
         setValue(VAL_COL, upd);
 
         notifyAll();
 
-        return null;
+        return upd;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 0edd102..ed3ff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -111,4 +111,9 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
      * @throws IgniteCheckedException If failed.
      */
     public Value wrap(Object o, int type) throws IgniteCheckedException;
+
+    /**
+     * @return {@code True} if should check swap value before offheap.
+     */
+    public boolean preferSwapValue();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 23f4e91..e6bf22b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -36,6 +36,8 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -45,7 +47,6 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     /** */
     private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,14 +61,29 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testStreamer() throws Exception {
+    public void testStreamerAtomic() throws Exception {
+        checkStreamer(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStreamerTx() throws Exception {
+        checkStreamer(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception {
         final Ignite ignite = startGrid(0);
 
-        final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+        final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration(atomicityMode));
 
         final AtomicBoolean stop = new AtomicBoolean();
 
-        final int KEYS= 10_000;
+        final int KEYS = 10_000;
 
         try {
             IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
@@ -118,14 +134,15 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param atomicityMode Cache atomicity mode.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration() {
+    private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) {
         CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setAtomicityMode(atomicityMode);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+        ccfg.setMemoryMode(OFFHEAP_TIERED);
         ccfg.setOffHeapMaxMemory(0);
         ccfg.setBackups(1);
         ccfg.setIndexedTypes(Integer.class, String.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
index e0e6ff0..cd1fc93 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
@@ -244,12 +244,12 @@ public class GridCacheSwapSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * TODO: IGNITE-599.
-     *
      * @throws Exception If failed.
      */
     public void testSwapEviction() throws Exception {
         try {
+            fail("https://issues.apache.org/jira/browse/IGNITE-599");
+
             final CountDownLatch evicted = new CountDownLatch(10);
 
             startGrids(1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index f30f70e..550c69f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
+import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -63,6 +64,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
 
         suite.addTestSuite(GridCacheOffheapIndexGetSelfTest.class);
         suite.addTestSuite(GridCacheOffheapIndexEntryEvictTest.class);
+        suite.addTestSuite(CacheIndexStreamerTest.class);
 
         suite.addTestSuite(CacheConfigurationP2PTest.class);
 


[08/23] ignite git commit: IGNITE-1522 - Made cache entry listener configurations transient in cache configuration

Posted by ag...@apache.org.
IGNITE-1522 - Made cache entry listener configurations transient in cache configuration


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e51fb420
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e51fb420
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e51fb420

Branch: refs/heads/ignite-1171
Commit: e51fb420d1284465c7cbe55a28c2374ddf67d495
Parents: 621eb0f
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Mon Sep 21 23:29:20 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Mon Sep 21 23:29:20 2015 -0700

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       | 15 +++++
 .../IgniteCacheEntryListenerAbstractTest.java   | 65 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 7d1e14d..44a3fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashSet;
 import javax.cache.Cache;
 import javax.cache.configuration.CompleteConfiguration;
 import javax.cache.configuration.Factory;
@@ -1799,6 +1800,20 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return this;
     }
 
+    /**
+     * Creates a copy of current configuration and removes all cache entry listeners.
+     * They are executed only locally and should never be sent to remote nodes.
+     *
+     * @return Configuration object that will be serialized.
+     */
+    protected Object writeReplace() {
+        CacheConfiguration<K, V> cfg = new CacheConfiguration<>(this);
+
+        cfg.listenerConfigurations = new HashSet<>();
+
+        return cfg;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 78a6700..3fdd7fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,11 +36,13 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryExpiredListener;
 import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryRemovedListener;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
@@ -358,6 +364,34 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testSerialization() throws Exception {
+        if (cacheMode() == LOCAL)
+            return;
+
+        AtomicBoolean serialized = new AtomicBoolean();
+
+        NonSerializableListener lsnr = new NonSerializableListener(serialized);
+
+        jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
+            FactoryBuilder.factoryOf(lsnr),
+            null,
+            true,
+            false
+        ));
+
+        try {
+            startGrid(gridCount());
+        }
+        finally {
+            stopGrid(gridCount());
+        }
+
+        assertFalse(serialized.get());
+    }
+
+    /**
      * @param key Key.
      * @param val Value.
      * @param cache Cache.
@@ -1190,4 +1224,33 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
     }
 
-}
\ No newline at end of file
+    /**
+     */
+    public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
+        /** */
+        private final AtomicBoolean serialized;
+
+        /**
+         * @param serialized Serialized flag.
+         */
+        public NonSerializableListener(AtomicBoolean serialized) {
+            this.serialized = serialized;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> evts)
+            throws CacheEntryListenerException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            // No-op.
+        }
+    }
+}


[13/23] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4

Posted by ag...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39dace45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39dace45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39dace45

Branch: refs/heads/ignite-1171
Commit: 39dace45c81aef7cb913fcf4f98a7d71e34beebd
Parents: f0be45e a104087
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:38:21 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:38:21 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java |  17 +--
 .../processors/cache/GridCacheProcessor.java    |   2 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 4 files changed, 125 insertions(+), 25 deletions(-)
----------------------------------------------------------------------



[06/23] ignite git commit: 1171-debug

Posted by ag...@apache.org.
1171-debug


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72baa620
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72baa620
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72baa620

Branch: refs/heads/ignite-1171
Commit: 72baa6205f0bc9439036730bde585e6b179255a7
Parents: 19e34f6 46a22e3
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 19:20:55 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 19:20:55 2015 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 69 ++++++++------------
 .../Apache.Ignite.Core/Impl/IgniteManager.cs    |  2 -
 modules/yardstick/config/ignite-base-config.xml |  2 +-
 3 files changed, 29 insertions(+), 44 deletions(-)
----------------------------------------------------------------------



[10/23] ignite git commit: Added test.

Posted by ag...@apache.org.
Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33fe30da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33fe30da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33fe30da

Branch: refs/heads/ignite-1171
Commit: 33fe30da620e4f08cee959104805f3527b597700
Parents: e51fb42
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:55:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:55:18 2015 +0300

----------------------------------------------------------------------
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 2 files changed, 119 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 66275b3..14417c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -21,12 +21,25 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
 /**
  * Client-based discovery SPI test with failure detection timeout enabled.
  */
@@ -60,7 +73,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
 
     /** {@inheritDoc} */
     @Override protected TcpDiscoverySpi getDiscoverySpi() {
-        return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+        return useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
     }
 
     /**
@@ -117,16 +130,16 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
     private void checkFailureThresholdWorkability() throws Exception {
         useTestSpi = true;
 
-        TestTcpDiscoverySpi firstSpi = null;
-        TestTcpDiscoverySpi secondSpi = null;
+        TestTcpDiscoverySpi2 firstSpi = null;
+        TestTcpDiscoverySpi2 secondSpi = null;
 
         try {
             startServerNodes(2);
 
             checkNodes(2, 0);
 
-            firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
-            secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+            firstSpi = (TestTcpDiscoverySpi2)(G.ignite("server-0").configuration().getDiscoverySpi());
+            secondSpi = (TestTcpDiscoverySpi2)(G.ignite("server-1").configuration().getDiscoverySpi());
 
             assert firstSpi.err == null;
 
@@ -157,9 +170,102 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
     }
 
     /**
+     * Test tries to provoke scenario when client sends reconnect message before router failure detected.
+     *
+     * @throws Exception If failed.
+     */
+    public void _testClientReconnectOnCoordinatorRouterFail() throws Exception {
+        startServerNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+        final UUID srvNodeId = srvNode.id();
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1)));
+
+        failureThreshold = 1000L;
+        netTimeout = 500L;
+
+        startClientNodes(1); // Client should connect to coordinator.
+
+        failureThreshold = 10_000L;
+        netTimeout = 5000L;
+
+        for (int i = 0; i < 2; i++) {
+            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+            srvNodeIds.add(g.cluster().localNode().id());
+        }
+
+        checkNodes(3, 1);
+
+        final CountDownLatch latch = new CountDownLatch(3);
+
+        String nodes[] = {"server-1", "server-2", "client-0"};
+
+        final AtomicBoolean err = new AtomicBoolean();
+
+        for (String node : nodes) {
+            G.ignite(node).events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    DiscoveryEvent disoEvt = (DiscoveryEvent)evt;
+
+                    if (disoEvt.eventNode().id().equals(srvNodeId)) {
+                        info("Expected node failed event: " + ((DiscoveryEvent) evt).eventNode());
+
+                        latch.countDown();
+                    }
+                    else {
+                        log.info("Unexpected node failed event: " + evt);
+
+                        err.set(true);
+                    }
+
+                    return true;
+                }
+            }, EVT_NODE_FAILED);
+        }
+
+        Thread.sleep(5000);
+
+        Ignite client = G.ignite("client-0");
+
+        UUID nodeId = client.cluster().localNode().id();
+
+        log.info("Fail coordinator: " + srvNodeId);
+
+        TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi();
+
+        srvSpi.pauseAll(false);
+
+        try {
+            Thread.sleep(2000);
+        }
+        finally {
+            srvSpi.simulateNodeFailure();
+            srvSpi.resumeAll();
+        }
+
+        try {
+            assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+
+            assertFalse("Unexpected event, see log for details.", err.get());
+            assertEquals(nodeId, client.cluster().localNode().id());
+        }
+        finally {
+            srvSpi.resumeAll();
+        }
+    }
+
+    /**
      *
      */
-    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi {
         /** */
         private long readDelay;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index c86f06a..9fbf5b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -89,13 +89,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final AtomicInteger srvIdx = new AtomicInteger();
+    protected static final AtomicInteger srvIdx = new AtomicInteger();
 
     /** */
     private static final AtomicInteger clientIdx = new AtomicInteger();
 
     /** */
-    private static Collection<UUID> srvNodeIds;
+    protected static Collection<UUID> srvNodeIds;
 
     /** */
     private static Collection<UUID> clientNodeIds;
@@ -128,13 +128,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     private UUID nodeId;
 
     /** */
-    private TcpDiscoveryVmIpFinder clientIpFinder;
+    protected TcpDiscoveryVmIpFinder clientIpFinder;
 
     /** */
     private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
 
     /** */
-    private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+    protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
 
     /** */
     private boolean longSockTimeouts;
@@ -466,7 +466,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             @Override public void apply(Socket sock) {
                 try {
                     latch.await();
-                } catch (InterruptedException e) {
+                }
+                catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
             }
@@ -2056,7 +2057,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /**
      *
      */
-    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
         /** */
         private final Object mux = new Object();
 


[19/23] ignite git commit: Merge branch 'ignite-1.4' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171-debug

Posted by ag...@apache.org.
Merge branch 'ignite-1.4' of https://git-wip-us.apache.org/repos/asf/ignite into ignite-1171-debug


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb758c1d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb758c1d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb758c1d

Branch: refs/heads/ignite-1171
Commit: cb758c1d79377fd4c352f6711d95f35ba62b4df6
Parents: 47f9605 1942d75
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 22 17:45:22 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 22 17:45:22 2015 -0700

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java |  17 +--
 .../configuration/CacheConfiguration.java       |  15 +++
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheMapEntry.java     |  51 ++++----
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../cache/GridCacheSwapEntryImpl.java           |  31 ++++-
 .../processors/cache/GridCacheSwapManager.java  |  80 ++++++++-----
 .../datastreamer/DataStreamerImpl.java          |   2 -
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  10 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |  65 +++++++++-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  33 ++++++
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  19 +--
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  54 ++++++---
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  11 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   5 +
 .../processors/query/h2/opt/GridH2Table.java    |  10 +-
 .../cache/CacheIndexStreamerTest.java           |  37 ++++--
 .../processors/cache/GridCacheSwapSelfTest.java |   4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 21 files changed, 453 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cb758c1d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb758c1d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------


[21/23] ignite git commit: 1171-debug - Removed debug.

Posted by ag...@apache.org.
1171-debug - Removed debug.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1edd63da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1edd63da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1edd63da

Branch: refs/heads/ignite-1171
Commit: 1edd63da2f73ccb8dbd51f2f9e1285b7c6b79a99
Parents: 29cd3db
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 22 18:29:56 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 22 18:29:56 2015 -0700

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryManager.java | 15 +-----
 .../ignite/internal/util/IgniteUtils.java       |  3 --
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 52 ++------------------
 3 files changed, 5 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1edd63da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index eaa66af..32637b30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -449,23 +449,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             taskNameHash,
             skipPrimaryCheck);
 
-        IgniteInternalFuture<UUID> f = cctx.kernalContext().continuous().startRoutine(
+        UUID id = cctx.kernalContext().continuous().startRoutine(
             hnd,
             bufSize,
             timeInterval,
             autoUnsubscribe,
-            grp.predicate());
-
-        while (!f.isDone()) {
-            try {
-                f.get(2000);
-            }
-            catch (Exception e) {
-                U.debug(log, "### Failed to wait for future: " + cctx.gridName() + " " + cctx.nodeId() + " " + f);
-            }
-        }
-
-        UUID id = f.get();
+            grp.predicate()).get();
 
         if (notifyExisting) {
             final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator();

http://git-wip-us.apache.org/repos/asf/ignite/blob/1edd63da/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index e730edc..e5090cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -1022,9 +1022,6 @@ public abstract class IgniteUtils {
      */
     @Deprecated
     public static void debug(IgniteLogger log, String msg) {
-        if (true)
-            return;
-
         log.info(msg);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/1edd63da/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 701e31e..0c2c6e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2086,11 +2086,6 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
-            if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
-                U.debug(
-                    log,
-                    "Processing message [locNodeId=" + locNode.id() + ", cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
-
             if (debugMode)
                 debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
@@ -2464,12 +2459,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
-                                    U.debug(
-                                        log,
-                                        "Pending message has been sent to next node [msgId=" + msg.id() +
-                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
-                                            ", res=" + res + ']');
-
                                     if (debugMode)
                                         debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
@@ -2510,9 +2499,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     log.debug("Message has been sent to next node [msg=" + msg +
                                         ", next=" + next.id() +
                                         ", res=" + res + ']');
-                                U.debug(log, "Message has been sent to next node [msg=" + msg +
-                                    ", next=" + next.id() +
-                                    ", res=" + res + ']');
 
                                 if (debugMode)
                                     debugLog("Message has been sent to next node [msg=" + msg +
@@ -3011,8 +2997,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
                     node, msg.discoveryData(), spi.gridStartTime);
 
-                U.debug(log, "Create node added msg: " + nodeAddedMsg);
-
                 nodeAddedMsg.client(msg.client());
 
                 processNodeAddedMessage(nodeAddedMsg);
@@ -3235,9 +3219,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.add(node.id());
 
-                U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() +
-                        ", msg=" + msg.id() + ']');
-
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
 
@@ -3467,8 +3448,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             joiningNodes.remove(nodeId);
 
-            U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
-
             TcpDiscoverySpiState state = spiStateCopy();
 
             if (msg.verified() && !locNodeId.equals(nodeId) && state != CONNECTING && fireEvt) {
@@ -3491,10 +3470,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
 
-                    U.debug(
-                        log,
-                        "Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
-
                     lastMsg = msg;
                 }
 
@@ -3655,10 +3630,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
 
-                    U.debug(
-                        log,
-                        "Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
-
                     lastMsg = msg;
                 }
 
@@ -3695,8 +3666,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(leftNode.id());
 
-                U.debug(log, "Joining nodes remove2: " + joiningNodes + ", node=" + leftNode.id());
-
                 spi.stats.onNodeLeft();
 
                 notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3831,10 +3800,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
 
-                    U.debug(
-                        log,
-                        "Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
-
                     lastMsg = msg;
                 }
 
@@ -3859,8 +3824,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(node.id());
 
-                U.debug(log, "Joining nodes remove3: " + joiningNodes + ", node=" + node.id());
-
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();
@@ -4253,8 +4216,6 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
-            U.debug(log, "Processing custom message [msg=" + msg + ", topVer=" + ring.topologyVersion() + ']');
-
             if (isLocalNodeCoordinator()) {
                 if (!joiningNodes.isEmpty()) {
                     pendingCustomMsgs.add(msg);
@@ -4265,8 +4226,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 boolean sndNext = !msg.verified();
 
                 if (sndNext) {
-                    U.debug(log, "Joining nodes are empty on coordinator, will proceed with message: " + msg);
-
                     msg.verify(getLocalNodeId());
                     msg.topologyVersion(ring.topologyVersion());
 
@@ -4315,7 +4274,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 if (msg.verified() && msg.topologyVersion() != ring.topologyVersion()) {
-                    U.debug(log, "Discarding custom message: " + msg + ", topver=" + ring.topologyVersion());
+                    if (log.isDebugEnabled())
+                        log.debug("Discarding custom event message [msg=" + msg + ", ring=" + ring + ']');
 
                     return;
                 }
@@ -4328,11 +4288,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                     notifyDiscoveryListener(msg);
                 }
 
-                if (ring.hasRemoteNodes()) {
-                    U.debug(log, "Will send message to the next node in topology [next=" + next + ", msg=" + msg + ']');
-
+                if (ring.hasRemoteNodes())
                     sendMessageAcrossRing(msg);
-                }
             }
         }
 
@@ -5447,9 +5404,6 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (log.isDebugEnabled())
                 log.debug("Message has been added to queue: " + msg);
-
-            if (!(msg instanceof TcpDiscoveryHeartbeatMessage))
-                U.debug(log, "Message has been added to queue: " + msg);
         }
 
         /**


[09/23] ignite git commit: Cleaned documentation. Set ATOMIC mode as default using specific constant.

Posted by ag...@apache.org.
Cleaned documentation. Set ATOMIC mode as default using specific constant.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50f75bd6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50f75bd6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50f75bd6

Branch: refs/heads/ignite-1171
Commit: 50f75bd6111b5b9163391e4c0913ff5b696a2862
Parents: e51fb42
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Sep 22 11:44:22 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Sep 22 11:44:22 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java    | 17 +++++------------
 .../processors/cache/GridCacheProcessor.java       |  2 +-
 2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
index 9e0f81e..92b5aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
@@ -33,11 +33,6 @@ public enum CacheAtomicityMode {
     /**
      * Specified fully {@code ACID}-compliant transactional cache behavior. See
      * {@link Transaction} for more information about transactions.
-     * <p>
-     * This mode is currently the default cache atomicity mode. However, cache
-     * atomicity mode will be changed to {@link #ATOMIC} starting from version {@code 5.2},
-     * so it is recommended that desired atomicity mode is explicitly configured
-     * instead of relying on default value.
      */
     TRANSACTIONAL,
 
@@ -49,18 +44,16 @@ public enum CacheAtomicityMode {
      * In addition to transactions and locking, one of the main differences in {@code ATOMIC} mode
      * is that bulk writes, such as {@code putAll(...)}, {@code removeAll(...)}, and {@code transformAll(...)}
      * methods, become simple batch operations which can partially fail. In case of partial
-     * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown which will contain a list of keys
-     * for which the update failed. It is recommended that bulk writes are used whenever multiple keys
-     * need to be inserted or updated in cache, as they reduce number of network trips and provide
-     * better performance.
+     * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown
+     * which will contain a list of keys for which the update failed. It is recommended that bulk writes are used
+     * whenever multiple keys need to be inserted or updated in cache, as they reduce number of network trips and
+     * provide better performance.
      * <p>
      * Note that even without locking and transactions, {@code ATOMIC} mode still provides
      * full consistency guarantees across all cache nodes.
      * <p>
      * Also note that all data modifications in {@code ATOMIC} mode are guaranteed to be atomic
      * and consistent with writes to the underlying persistent store, if one is configured.
-     * <p>
-     * This mode is currently implemented for {@link CacheMode#PARTITIONED} caches only.
      */
     ATOMIC;
 
@@ -76,4 +69,4 @@ public enum CacheAtomicityMode {
     @Nullable public static CacheAtomicityMode fromOrdinal(int ord) {
         return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7c16136..9c325aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -271,7 +271,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cfg.setRebalanceMode(ASYNC);
 
         if (cfg.getAtomicityMode() == null)
-            cfg.setAtomicityMode(ATOMIC);
+            cfg.setAtomicityMode(CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE);
 
         if (cfg.getWriteSynchronizationMode() == null)
             cfg.setWriteSynchronizationMode(PRIMARY_SYNC);


[17/23] ignite git commit: debugging

Posted by ag...@apache.org.
debugging


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/271b7501
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/271b7501
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/271b7501

Branch: refs/heads/ignite-1171
Commit: 271b7501cfe53b046a69c3930a9bed3ed4dffb12
Parents: af6deb8
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Tue Sep 22 19:36:33 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Tue Sep 22 19:36:33 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 37 +++++++-
 .../tcp/internal/TcpDiscoveryNodesRing.java     | 94 +++++++-------------
 ...GridCacheValueConsistencyAtomicSelfTest.java |  2 +-
 3 files changed, 67 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/271b7501/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 69dd512..3d624d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2044,6 +2044,10 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (log.isDebugEnabled())
                 log.debug("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
+            U.debug(
+                log,
+                "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+
             if (debugMode)
                 debugLog("Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
 
@@ -2417,6 +2421,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
                                             ", res=" + res + ']');
 
+                                    U.debug(
+                                        log,
+                                        "Pending message has been sent to next node [msgId=" + msg.id() +
+                                            ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
+                                            ", res=" + res + ']');
+
                                     if (debugMode)
                                         debugLog("Pending message has been sent to next node [msgId=" + msg.id() +
                                             ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() +
@@ -2955,6 +2965,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
                     node, msg.discoveryData(), spi.gridStartTime);
 
+                U.debug(log, "Create node added msg: " + nodeAddedMsg);
+
                 nodeAddedMsg.client(msg.client());
 
                 processNodeAddedMessage(nodeAddedMsg);
@@ -3177,7 +3189,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.add(node.id());
 
-                U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() + ']');
+                U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() +
+                        ", msg=" + msg.id() + ']');
 
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
@@ -3430,6 +3443,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
 
+                    U.debug(
+                        log,
+                        "Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
+
                     lastMsg = msg;
                 }
 
@@ -3589,6 +3606,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
 
+                    U.debug(
+                        log,
+                        "Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
+
                     lastMsg = msg;
                 }
 
@@ -3761,6 +3782,10 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (log.isDebugEnabled())
                         log.debug("Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
 
+                    U.debug(
+                        log,
+                        "Topology version has been updated: [ring=" + ring + ", msg=" + msg + ']');
+
                     lastMsg = msg;
                 }
 
@@ -4240,8 +4265,16 @@ class ServerImpl extends TcpDiscoveryImpl {
                     state0 = spiState;
                 }
 
+                if (msg.verified() && msg.topologyVersion() != ring.topologyVersion()) {
+                    U.debug(log, "Discarding custom message: " + msg + ", topver=" + ring.topologyVersion());
+
+                    return;
+                }
+
                 if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
-                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id();
+                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id() +
+                        ", topver=" + ring.topologyVersion();
+                    assert msg.topologyVersion() == ring.topologyVersion() : "msg: " + msg + ", topver=" + ring.topologyVersion();
 
                     notifyDiscoveryListener(msg);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/271b7501/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
index 2b17696..7ca092c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java
@@ -17,7 +17,17 @@
 
 package org.apache.ignite.spi.discovery.tcp.internal;
 
-import java.util.ArrayList;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.PN;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -29,16 +39,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.PN;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Convenient way to represent topology for {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
@@ -81,6 +81,9 @@ public class TcpDiscoveryNodesRing {
     /** */
     private long nodeOrder;
 
+    /** */
+    private long maxInternalOrder;
+
     /** Lock. */
     @GridToStringExclude
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
@@ -99,6 +102,8 @@ public class TcpDiscoveryNodesRing {
             this.locNode = locNode;
 
             clear();
+
+            maxInternalOrder = locNode.internalOrder();
         }
         finally {
             rwLock.writeLock().unlock();
@@ -204,7 +209,9 @@ public class TcpDiscoveryNodesRing {
             if (nodesMap.containsKey(node.id()))
                 return false;
 
-            assert node.internalOrder() > maxInternalOrder() : "Adding node to the middle of the ring " +
+            long maxInternalOrder0 = maxInternalOrder();
+
+            assert node.internalOrder() > maxInternalOrder0 : "Adding node to the middle of the ring " +
                 "[ring=" + this + ", node=" + node + ']';
 
             nodesMap.put(node.id(), node);
@@ -216,6 +223,8 @@ public class TcpDiscoveryNodesRing {
             nodes.add(node);
 
             nodeOrder = node.internalOrder();
+
+            maxInternalOrder = node.internalOrder();
         }
         finally {
             rwLock.writeLock().unlock();
@@ -231,9 +240,13 @@ public class TcpDiscoveryNodesRing {
         rwLock.readLock().lock();
 
         try {
-            TcpDiscoveryNode last = nodes.last();
+            if (maxInternalOrder == 0) {
+                TcpDiscoveryNode last = nodes.last();
+
+                return last != null ? maxInternalOrder = last.internalOrder() : -1;
+            }
 
-            return last != null ? last.internalOrder() : -1;
+            return maxInternalOrder;
         }
         finally {
             rwLock.readLock().unlock();
@@ -336,47 +349,6 @@ public class TcpDiscoveryNodesRing {
     }
 
     /**
-     * Removes nodes from the topology.
-     *
-     * @param nodeIds IDs of the nodes to remove.
-     * @return Collection of removed nodes.
-     */
-    public Collection<TcpDiscoveryNode> removeNodes(Collection<UUID> nodeIds) {
-        assert !F.isEmpty(nodeIds);
-
-        rwLock.writeLock().lock();
-
-        try {
-            boolean firstRmv = true;
-
-            Collection<TcpDiscoveryNode> res = null;
-
-            for (UUID id : nodeIds) {
-                TcpDiscoveryNode rmv = nodesMap.remove(id);
-
-                if (rmv != null) {
-                    if (firstRmv) {
-                        nodes = new TreeSet<>(nodes);
-
-                        res = new ArrayList<>(nodeIds.size());
-
-                        firstRmv = false;
-                    }
-
-                    nodes.remove(rmv);
-
-                    res.add(rmv);
-                }
-            }
-
-            return res == null ? Collections.<TcpDiscoveryNode>emptyList() : res;
-        }
-        finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
-    /**
      * Removes all remote nodes, leaves only local node.
      * <p>
      * This should be called when SPI should be disconnected from topology and
@@ -397,6 +369,7 @@ public class TcpDiscoveryNodesRing {
                 nodesMap.put(locNode.id(), locNode);
 
             nodeOrder = 0;
+            maxInternalOrder = 0;
 
             topVer = 0;
         }
@@ -622,13 +595,8 @@ public class TcpDiscoveryNodesRing {
         rwLock.writeLock().lock();
 
         try {
-            if (nodeOrder == 0) {
-                TcpDiscoveryNode last = nodes.last();
-
-                assert last != null;
-
-                nodeOrder = last.internalOrder();
-            }
+            if (nodeOrder == 0)
+                nodeOrder = maxInternalOrder();
 
             return ++nodeOrder;
         }
@@ -681,4 +649,4 @@ public class TcpDiscoveryNodesRing {
             rwLock.readLock().unlock();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/271b7501/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
index 7451911..18c8d8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheValueConsistencyAtomicSelfTest.java
@@ -35,4 +35,4 @@ public class GridCacheValueConsistencyAtomicSelfTest extends GridCacheValueConsi
     @Override protected int iterationCount() {
         return 100_000;
     }
-}
\ No newline at end of file
+}


[07/23] ignite git commit: 1171-debug

Posted by ag...@apache.org.
1171-debug


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af6deb8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af6deb8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af6deb8b

Branch: refs/heads/ignite-1171
Commit: af6deb8bb17cc447c1c7e1fd28ef955bcf8ef76c
Parents: 72baa62
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Sep 21 19:21:36 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Sep 21 19:21:36 2015 -0700

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 168 +++++++++++++------
 .../messages/TcpDiscoveryDiscardMessage.java    |  15 +-
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  33 +++-
 3 files changed, 163 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f625d0d..69dd512 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -37,9 +37,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.Set;
 import java.util.SortedMap;
@@ -1370,8 +1372,14 @@ class ServerImpl extends TcpDiscoveryImpl {
      * @param msgs Messages to include.
      * @param discardMsgId Discarded message ID.
      */
-    private void prepareNodeAddedMessage(TcpDiscoveryAbstractMessage msg, UUID destNodeId,
-        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+    private void prepareNodeAddedMessage(
+        TcpDiscoveryAbstractMessage msg,
+        UUID destNodeId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable IgniteUuid discardMsgId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+        @Nullable IgniteUuid discardCustomMsgId
+        ) {
         assert destNodeId != null;
 
         if (msg instanceof TcpDiscoveryNodeAddedMessage) {
@@ -1395,7 +1403,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 nodeAddedMsg.topology(topToSnd);
-                nodeAddedMsg.messages(msgs, discardMsgId);
+                nodeAddedMsg.messages(msgs, discardMsgId, customMsgs, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
 
@@ -1418,7 +1426,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             nodeAddedMsg.topology(null);
             nodeAddedMsg.topologyHistory(null);
-            nodeAddedMsg.messages(null, null);
+            nodeAddedMsg.messages(null, null, null, null);
         }
     }
 
@@ -1827,7 +1835,7 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
             if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                prepareNodeAddedMessage(msg, destNodeId, null, null);
+                prepareNodeAddedMessage(msg, destNodeId, null, null, null, null);
 
             return msg;
         }
@@ -1836,19 +1844,25 @@ class ServerImpl extends TcpDiscoveryImpl {
     /**
      * Pending messages container.
      */
-    private static class PendingMessages {
+    private static class PendingMessages implements Iterable<TcpDiscoveryAbstractMessage> {
         /** */
         private static final int MAX = 1024;
 
         /** Pending messages. */
         private final Queue<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2);
 
+        /** Pending messages. */
+        private final Queue<TcpDiscoveryAbstractMessage> customMsgs = new ArrayDeque<>(MAX * 2);
+
         /** Processed custom message IDs. */
         private Set<IgniteUuid> procCustomMsgs = new GridBoundedLinkedHashSet<IgniteUuid>(MAX * 2);
 
         /** Discarded message ID. */
         private IgniteUuid discardId;
 
+        /** Discarded message ID. */
+        private IgniteUuid customDiscardId;
+
         /**
          * Adds pending message and shrinks queue if it exceeds limit
          * (messages that were not discarded yet are never removed).
@@ -1856,10 +1870,12 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to add.
          */
         void add(TcpDiscoveryAbstractMessage msg) {
-            msgs.add(msg);
+            Queue<TcpDiscoveryAbstractMessage> msgs0 = msg instanceof TcpDiscoveryCustomEventMessage ? customMsgs : msgs;
 
-            while (msgs.size() > MAX) {
-                TcpDiscoveryAbstractMessage polled = msgs.poll();
+            msgs0.add(msg);
+
+            while (msgs0.size() > MAX) {
+                TcpDiscoveryAbstractMessage polled = msgs0.poll();
 
                 assert polled != null;
 
@@ -1874,11 +1890,23 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msgs Message.
          * @param discardId Discarded message ID.
          */
-        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId) {
+        void reset(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardId,
+            @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs, @Nullable IgniteUuid duscardCustomId) {
             this.msgs.clear();
+            this.customMsgs.clear();
 
-            if (msgs != null)
-                this.msgs.addAll(msgs);
+            if (msgs != null) {
+                // Backward compatibility: old nodes send messages in one collection.
+                for (TcpDiscoveryAbstractMessage msg : msgs) {
+                    if (msg instanceof TcpDiscoveryCustomEventMessage)
+                        this.customMsgs.add(msg);
+                    else
+                        this.msgs.add(msg);
+                }
+            }
+
+            if (customMsgs != null)
+                this.customMsgs.addAll(customMsgs);
 
             this.discardId = discardId;
         }
@@ -1888,8 +1916,44 @@ class ServerImpl extends TcpDiscoveryImpl {
          *
          * @param id Discarded message ID.
          */
-        void discard(IgniteUuid id) {
-            discardId = id;
+        void discard(IgniteUuid id, boolean custom) {
+            if (custom)
+                customDiscardId = id;
+            else
+                discardId = id;
+        }
+
+        /**
+         * Gets iterator for non-discarded messages.
+         *
+         * @return Non-discarded messages iterator.
+         */
+        public Iterator<TcpDiscoveryAbstractMessage> iterator() {
+            Iterator<TcpDiscoveryAbstractMessage> msgIt = msgs.iterator();
+
+            if (discardId != null) {
+                while (msgIt.hasNext()) {
+                    TcpDiscoveryAbstractMessage msg = msgIt.next();
+
+                    // Skip all messages before discarded, inclusive.
+                    if (discardId.equals(msg.id()))
+                        break;
+                }
+            }
+
+            Iterator<TcpDiscoveryAbstractMessage> customMsgIt = customMsgs.iterator();
+
+            if (customDiscardId != null) {
+                while (customMsgIt.hasNext()) {
+                    TcpDiscoveryAbstractMessage msg = customMsgIt.next();
+
+                    // Skip all messages before discarded, inclusive.
+                    if (customDiscardId.equals(msg.id()))
+                        break;
+                }
+            }
+
+            return F.concat(msgIt, customMsgIt);
         }
     }
 
@@ -2327,21 +2391,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     debugLog("Pending messages will be sent [failure=" + failure +
                                         ", forceSndPending=" + forceSndPending + ']');
 
-                                boolean skip = pendingMsgs.discardId != null;
-
-                                for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
-                                    if (skip) {
-                                        if (pendingMsg.id().equals(pendingMsgs.discardId))
-                                            skip = false;
-
-                                        if (!(pendingMsg instanceof TcpDiscoveryCustomEventMessage))
-                                            continue;
-                                    }
-
+                                for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
                                     long tstamp = U.currentTimeMillis();
 
                                     prepareNodeAddedMessage(pendingMsg, next.id(), pendingMsgs.msgs,
-                                        pendingMsgs.discardId);
+                                        pendingMsgs.discardId, pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
 
                                     if (timeoutHelper == null)
                                         timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi);
@@ -2382,7 +2436,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     msg = new TcpDiscoveryStatusCheckMessage(locNode, null);
                             }
                             else
-                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId);
+                                prepareNodeAddedMessage(msg, next.id(), pendingMsgs.msgs, pendingMsgs.discardId,
+                                    pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
 
                             try {
                                 long tstamp = U.currentTimeMillis();
@@ -2521,17 +2576,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     if (debugMode)
                         log.debug("Pending messages will be resent to local node");
 
-                    boolean skip = pendingMsgs.discardId != null;
-
-                    for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs.msgs) {
-                        if (skip) {
-                            if (pendingMsg.id().equals(pendingMsgs.discardId))
-                                skip = false;
-
-                            continue;
-                        }
-
-                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId);
+                    for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) {
+                        prepareNodeAddedMessage(pendingMsg, locNodeId, pendingMsgs.msgs, pendingMsgs.discardId,
+                            pendingMsgs.customMsgs, pendingMsgs.customDiscardId);
 
                         msgWorker.addMessage(pendingMsg);
 
@@ -3087,7 +3134,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     processNodeAddFinishedMessage(addFinishMsg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3130,6 +3177,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.add(node.id());
 
+                U.debug(log, "Added joining node [joiningNodes=" + joiningNodes + ", node=" + node.id() + ']');
+
                 if (!isLocalNodeCoordinator() && spi.nodeAuth != null && spi.nodeAuth.isGlobalNodeAuthentication()) {
                     boolean authFailed = true;
 
@@ -3251,10 +3300,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                             topHist.clear();
                             topHist.putAll(msg.topologyHistory());
 
-                            pendingMsgs.reset(msg.messages(), msg.discardedMessageId());
+                            pendingMsgs.reset(msg.messages(), msg.discardedMessageId(),
+                                msg.customMessages(), msg.discardedCustomMessageId());
 
                             // Clear data to minimize message size.
-                            msg.messages(null, null);
+                            msg.messages(null, null, null, null);
                             msg.topology(null);
                             msg.topologyHistory(null);
                             msg.clearDiscoveryData();
@@ -3321,7 +3371,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3358,6 +3408,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             joiningNodes.remove(nodeId);
 
+            U.debug(log, "Joining nodes remove1: " + joiningNodes + ", node=" + nodeId);
+
             if (msg.verified() && !locNodeId.equals(nodeId) && spiStateCopy() == CONNECTED && fireEvt) {
                 spi.stats.onNodeJoined();
 
@@ -3499,7 +3551,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3573,6 +3625,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(leftNode.id());
 
+                U.debug(log, "Joining nodes remove2: " + joiningNodes + ", node=" + leftNode.id());
+
                 spi.stats.onNodeLeft();
 
                 notifyDiscovery(EVT_NODE_LEFT, topVer, leftNode);
@@ -3672,7 +3726,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msg.verified()) {
                     spi.stats.onRingMessageReceived(msg);
 
-                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id()));
+                    addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false));
 
                     return;
                 }
@@ -3731,6 +3785,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 joiningNodes.remove(node.id());
 
+                U.debug(log, "Joining nodes remove3: " + joiningNodes + ", node=" + node.id());
+
                 notifyDiscovery(EVT_NODE_FAILED, topVer, node);
 
                 spi.stats.onNodeFailed();
@@ -4072,7 +4128,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             }
 
             if (msg.verified())
-                pendingMsgs.discard(msgId);
+                pendingMsgs.discard(msgId, msg.customMessageDiscard());
 
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);
@@ -4123,6 +4179,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+            U.debug(log, "Processing custom message: " + msg);
+
             if (isLocalNodeCoordinator()) {
                 if (!joiningNodes.isEmpty()) {
                     pendingCustomMsgs.add(msg);
@@ -4133,11 +4191,15 @@ class ServerImpl extends TcpDiscoveryImpl {
                 boolean sndNext = !msg.verified();
 
                 if (sndNext) {
+                    U.debug(log, "Joining nodes are empty on coordinator, will proceed with message: " + msg);
+
                     msg.verify(getLocalNodeId());
                     msg.topologyVersion(ring.topologyVersion());
 
                     if (pendingMsgs.procCustomMsgs.add(msg.id()))
                         notifyDiscoveryListener(msg);
+                    else
+                        sndNext = false;
                 }
 
                 if (sndNext && ring.hasRemoteNodes())
@@ -4167,6 +4229,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                             }
                         }
                     }
+
+                    addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
                 }
             }
             else {
@@ -4176,15 +4240,17 @@ class ServerImpl extends TcpDiscoveryImpl {
                     state0 = spiState;
                 }
 
-                if (msg.verified() && state0 == CONNECTED) {
-                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes;
+                if (msg.verified() && state0 == CONNECTED && pendingMsgs.procCustomMsgs.add(msg.id())) {
+                    assert joiningNodes.isEmpty() : "Joining nodes: " + joiningNodes + ", msg=" + msg + ", loc=" + locNode.id();
 
-                    if (pendingMsgs.procCustomMsgs.add(msg.id()))
-                        notifyDiscoveryListener(msg);
+                    notifyDiscoveryListener(msg);
                 }
 
-                if (ring.hasRemoteNodes())
+                if (ring.hasRemoteNodes()) {
+                    U.debug(log, "Will send message to the next node in topology [next=" + next + ", msg=" + msg + ']');
+
                     sendMessageAcrossRing(msg);
+                }
             }
         }
 
@@ -5130,7 +5196,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                             log.debug("Redirecting message to client [sock=" + sock + ", locNodeId="
                                 + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']');
 
-                        prepareNodeAddedMessage(msg, clientNodeId, null, null);
+                        prepareNodeAddedMessage(msg, clientNodeId, null, null, null, null);
 
                         writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ?
                             spi.failureDetectionTimeout() : spi.getSocketTimeout());

http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
index 1e1fa6b..145f19e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryDiscardMessage.java
@@ -32,16 +32,20 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
     /** ID of the message to discard (this and all preceding). */
     private final IgniteUuid msgId;
 
+    /** True if this is discard ID for custom event message. */
+    private final boolean customMsgDiscard;
+
     /**
      * Constructor.
      *
      * @param creatorNodeId Creator node ID.
      * @param msgId Message ID.
      */
-    public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId) {
+    public TcpDiscoveryDiscardMessage(UUID creatorNodeId, IgniteUuid msgId, boolean customMsgDiscard) {
         super(creatorNodeId);
 
         this.msgId = msgId;
+        this.customMsgDiscard = customMsgDiscard;
     }
 
     /**
@@ -53,6 +57,15 @@ public class TcpDiscoveryDiscardMessage extends TcpDiscoveryAbstractMessage {
         return msgId;
     }
 
+    /**
+     * Flag indicating whether the ID to discard is for a custom message or not.
+     *
+     * @return Custom message flag.
+     */
+    public boolean customMessageDiscard() {
+        return customMsgDiscard;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryDiscardMessage.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/af6deb8b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 01c6789..789f2b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -48,6 +48,12 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     /** Discarded message ID. */
     private IgniteUuid discardMsgId;
 
+    /** Pending messages from previous node. */
+    private Collection<TcpDiscoveryAbstractMessage> customMsgs;
+
+    /** Discarded message ID. */
+    private IgniteUuid discardCustomMsgId;
+
     /** Current topology. Initialized by coordinator. */
     @GridToStringInclude
     private Collection<TcpDiscoveryNode> top;
@@ -117,14 +123,39 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * Gets pending cusotm messages sent to new node by its previous.
+     *
+     * @return Pending messages from previous node.
+     */
+    @Nullable public Collection<TcpDiscoveryAbstractMessage> customMessages() {
+        return customMsgs;
+    }
+
+    /**
+     * Gets discarded custom message ID.
+     *
+     * @return Discarded message ID.
+     */
+    @Nullable public IgniteUuid discardedCustomMessageId() {
+        return discardCustomMsgId;
+    }
+
+    /**
      * Sets pending messages to send to new node.
      *
      * @param msgs Pending messages to send to new node.
      * @param discardMsgId Discarded message ID.
      */
-    public void messages(@Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @Nullable IgniteUuid discardMsgId) {
+    public void messages(
+        @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
+        @Nullable IgniteUuid discardMsgId,
+        @Nullable Collection<TcpDiscoveryAbstractMessage> customMsgs,
+        @Nullable IgniteUuid discardCustomMsgId
+    ) {
         this.msgs = msgs;
         this.discardMsgId = discardMsgId;
+        this.customMsgs = customMsgs;
+        this.discardCustomMsgId = discardCustomMsgId;
     }
 
     /**