You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/29 14:42:26 UTC

[1/4] ignite git commit: IGNITE-10079 Fiexd WAL segments compression bug when FileWriteAheadLogManager return invalid lastCompactedSegment

Repository: ignite
Updated Branches:
  refs/heads/ignite-10044 e0657d718 -> 94ec2f892


IGNITE-10079 Fiexd WAL segments compression bug when FileWriteAheadLogManager return invalid lastCompactedSegment

Signed-off-by: Andrey Gura <ag...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a09d546
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a09d546
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a09d546

Branch: refs/heads/ignite-10044
Commit: 2a09d54625c62acbb05ee71fb98c513b5e2c3183
Parents: 7920646
Author: Andrey Kuznetsov <st...@gmail.com>
Authored: Thu Nov 29 14:54:43 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Thu Nov 29 14:54:43 2018 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           |  34 ++--
 .../wal/aware/SegmentArchivedStorage.java       |  16 ++
 .../persistence/wal/aware/SegmentAware.java     |  13 +-
 .../wal/aware/SegmentCompressStorage.java       |   7 +-
 .../db/wal/WalCompactionAfterRestartTest.java   | 161 +++++++++++++++++++
 .../persistence/wal/aware/SegmentAwareTest.java |  10 +-
 6 files changed, 211 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index b56b64f..fad1ec1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1932,7 +1932,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
             boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0));
 
-            return reserved ? segmentToCompress : -1;
+            if (reserved)
+                return segmentToCompress;
+            else {
+                segmentAware.onSegmentCompressed(segmentToCompress);
+
+                return -1;
+            }
         }
 
         /** {@inheritDoc} */
@@ -1946,9 +1952,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 long segIdx = -1L;
 
                 try {
-                    segIdx = tryReserveNextSegmentOrWait();
-
-                    if (segIdx <= segmentAware.lastCompressedIdx())
+                    if ((segIdx = tryReserveNextSegmentOrWait()) == -1)
                         continue;
 
                     deleteObsoleteRawSegments();
@@ -1967,21 +1971,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                     Files.move(tmpZip.toPath(), zip.toPath());
 
-                    if (mode != WALMode.NONE) {
-                        try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
-                            f0.force();
-                        }
-
-                        if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) {
-                            evt.record(new WalSegmentCompactedEvent(
-                                    cctx.localNode(),
-                                    segIdx,
-                                    zip.getAbsoluteFile())
-                            );
-                        }
+                    try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
+                        f0.force();
                     }
 
                     segmentAware.onSegmentCompressed(segIdx);
+
+                    if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) {
+                        evt.record(new WalSegmentCompactedEvent(
+                                cctx.localNode(),
+                                segIdx,
+                                zip.getAbsoluteFile())
+                        );
+                    }
                 }
                 catch (IgniteInterruptedCheckedException ignore) {
                     Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index c526ae1..e31628f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -34,6 +34,8 @@ class SegmentArchivedStorage extends SegmentObservable {
      * no segments archived.
      */
     private volatile long lastAbsArchivedIdx = -1;
+    /** Latest truncated segment. */
+    private volatile long lastTruncatedArchiveIdx = -1;
 
     /**
      * @param segmentLockStorage Protects WAL work segments from moving.
@@ -136,4 +138,18 @@ class SegmentArchivedStorage extends SegmentObservable {
     private synchronized void onSegmentUnlocked(long segmentId) {
         notifyAll();
     }
+
+    /**
+     * @param lastTruncatedArchiveIdx Last truncated segment.
+     */
+    void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
+        this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
+    }
+
+    /**
+     * @return Last truncated segment.
+     */
+    long lastTruncatedArchiveIdx() {
+        return lastTruncatedArchiveIdx;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index e46d93f..a14f0ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -27,8 +27,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.
  * Holder of actual information of latest manipulation on WAL segments.
  */
 public class SegmentAware {
-    /** Latest truncated segment. */
-    private volatile long lastTruncatedArchiveIdx = -1L;
     /** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */
     private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage();
     /** Lock on segment protects from archiving segment. */
@@ -106,7 +104,12 @@ public class SegmentAware {
      * there's no segment to archive right now.
      */
     public long waitNextSegmentToCompress() throws IgniteInterruptedCheckedException {
-        return Math.max(segmentCompressStorage.nextSegmentToCompressOrWait(), lastTruncatedArchiveIdx + 1);
+        long idx;
+
+        while ((idx = segmentCompressStorage.nextSegmentToCompressOrWait()) <= lastTruncatedArchiveIdx())
+            onSegmentCompressed(idx);
+
+        return idx;
     }
 
     /**
@@ -152,14 +155,14 @@ public class SegmentAware {
      * @param lastTruncatedArchiveIdx Last truncated segment;
      */
     public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
-        this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
+        segmentArchivedStorage.lastTruncatedArchiveIdx(lastTruncatedArchiveIdx);
     }
 
     /**
      * @return Last truncated segment.
      */
     public long lastTruncatedArchiveIdx() {
-        return lastTruncatedArchiveIdx;
+        return segmentArchivedStorage.lastTruncatedArchiveIdx();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 174fb46..95d4f4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -94,6 +94,9 @@ public class SegmentCompressStorage {
             this.lastCompressedIdx = Math.min(lastMaxCompressedIdx, compressingSegments.get(0) - 1);
         else
             this.lastCompressedIdx = lastMaxCompressedIdx;
+
+        if (compressedIdx > lastEnqueuedToCompressIdx)
+            lastEnqueuedToCompressIdx = compressedIdx;
     }
 
     /**
@@ -120,9 +123,11 @@ public class SegmentCompressStorage {
 
         Long idx = segmentsToCompress.poll();
 
+        assert idx != null;
+
         compressingSegments.add(idx);
 
-        return idx == null ? -1L : idx;
+        return idx;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
new file mode 100644
index 0000000..3685fe7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
@@ -0,0 +1,161 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.WalSegmentCompactedEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
+
+/** */
+public class WalCompactionAfterRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(true)
+                .setMaxSize(200L * 1024 * 1024))
+            .setWalMode(WALMode.LOG_ONLY)
+            .setWalSegmentSize(512 * 1024)
+            .setWalCompactionEnabled(true)
+            .setMaxWalArchiveSize(2 * 512 * 1024)
+        );
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(DEFAULT_CACHE_NAME);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+        ccfg.setBackups(0);
+
+        cfg.setCacheConfiguration(ccfg);
+        cfg.setConsistentId(name);
+
+        cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void test() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().active(true);
+
+        doCachePuts(ig, 10_000);
+
+        ig.cluster().active(false);
+
+        stopGrid(0);
+
+        IgniteEx ig0 = startGrid(0);
+
+        ig0.cluster().active(true);
+
+        List<IgniteBiTuple<Long, Long>> discrepancies = Collections.synchronizedList(new ArrayList<>());
+
+        ig0.events().localListen(e -> {
+            long evtSegIdx = ((WalSegmentCompactedEvent)e).getAbsWalSegmentIdx();
+            long lastCompactedIdx = ig0.context().cache().context().wal().lastCompactedSegment();
+
+            if (lastCompactedIdx < 0 || lastCompactedIdx > evtSegIdx)
+                discrepancies.add(F.t(evtSegIdx, lastCompactedIdx));
+
+            return true;
+        }, EVT_WAL_SEGMENT_COMPACTED);
+
+        doCachePuts(ig0, 5_000);
+
+        stopGrid(0);
+
+        if (!discrepancies.isEmpty()) {
+            fail("Discrepancies (EVT_WAL_SEGMENT_COMPACTED index vs. lastCompactedSegment):" + System.lineSeparator() +
+                discrepancies.stream()
+                    .map(t -> String.format("%d <-> %d", t.get1(), t.get2()))
+                    .collect(Collectors.joining(System.lineSeparator())));
+        }
+    }
+
+    /** */
+    private void doCachePuts(IgniteEx ig, long millis) throws IgniteCheckedException {
+        IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<Long> putFut = GridTestUtils.runMultiThreadedAsync(() -> {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            while (!stop.get())
+                cache.put(rnd.nextInt(), "Ignite".getBytes());
+        }, 4, "cache-filler");
+
+        U.sleep(millis);
+
+        stop.set(true);
+
+        putFut.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 0869356..d651e01 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -438,18 +438,12 @@ public class SegmentAwareTest extends TestCase {
      * Next segment for compress based on truncated archive idx.
      */
     public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException {
-        //given: thread which awaited segment.
         SegmentAware aware = new SegmentAware(10, true);
 
-        aware.onSegmentCompressed(5);
         aware.setLastArchivedAbsoluteIndex(6);
-        aware.lastTruncatedArchiveIdx(7);
-
-        //when:
-        long segmentToCompress = aware.waitNextSegmentToCompress();
 
-        //then: segment to compress greater than truncated archive idx
-        assertEquals(8, segmentToCompress);
+        for (int exp = 0; exp <= 6; exp++)
+            assertEquals(exp, aware.waitNextSegmentToCompress());
     }
 
     /**


[4/4] ignite git commit: 10044

Posted by sb...@apache.org.
10044


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/94ec2f89
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/94ec2f89
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/94ec2f89

Branch: refs/heads/ignite-10044
Commit: 94ec2f89255aa15fd7860f390167239d7d2f8f37
Parents: a284207
Author: sboikov <sb...@apache.org>
Authored: Thu Nov 29 17:42:13 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Thu Nov 29 17:42:13 2018 +0300

----------------------------------------------------------------------
 .../cache/GridCachePartitionExchangeManager.java        |  2 +-
 .../dht/topology/GridClientPartitionTopology.java       | 12 +++++++++++-
 .../dht/topology/GridDhtPartitionTopologyImpl.java      |  2 +-
 3 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/94ec2f89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6dad367..38cbe46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -854,7 +854,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             aff.partitions());
 
         GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
-            top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), affKey));
+            top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), ccfg.getPartitionLossPolicy(), affKey));
 
         return old != null ? old : top;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/94ec2f89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index feca1e7..02faa51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -127,6 +128,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     private volatile Map<Integer, Long> globalPartSizes;
 
     /** */
+    private final PartitionLossPolicy partLossPlc;
+
+    /** */
     private TreeSet<Integer> lostParts;
 
     /**
@@ -134,6 +138,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
      * @param discoCache Discovery data cache.
      * @param grpId Group ID.
      * @param parts Number of partitions in the group.
+     * @param partLossPlc Partition loss policy.
      * @param similarAffKey Key to find caches with similar affinity.
      */
     public GridClientPartitionTopology(
@@ -141,6 +146,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         DiscoCache discoCache,
         int grpId,
         int parts,
+        PartitionLossPolicy partLossPlc,
         Object similarAffKey
     ) {
         this.cctx = cctx;
@@ -148,6 +154,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
         this.grpId = grpId;
         this.similarAffKey = similarAffKey;
         this.parts = parts;
+        this.partLossPlc = partLossPlc;
 
         topVer = AffinityTopologyVersion.NONE;
 
@@ -944,7 +951,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
             if (cur == null || !cur.equals(parts))
                 changed = true;
 
-            if (lostParts != null) {
+            if (lostParts != null && partLossPlc != PartitionLossPolicy.IGNORE) {
                 for (Integer lostPart : lostParts) {
                     GridDhtPartitionState state0 = parts.get(lostPart);
 
@@ -1005,6 +1012,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
 
     /** {@inheritDoc} */
     @Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
+        if (partLossPlc == PartitionLossPolicy.IGNORE)
+            return false;
+
         lock.writeLock().lock();
 
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/94ec2f89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index d518467..cc8a198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -1807,7 +1807,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (cur == null || !cur.equals(parts))
                     changed = true;
 
-                if (lostParts != null) {
+                if (lostParts != null && grp.config().getPartitionLossPolicy() != PartitionLossPolicy.IGNORE) {
                     for (Integer lostPart : lostParts) {
                         GridDhtPartitionState state0 = parts.get(lostPart);
 


[2/4] ignite git commit: IGNITE-10354 Failing client node due to not receiving metrics updates - Fixes #5485.

Posted by sb...@apache.org.
IGNITE-10354 Failing client node due to not receiving metrics updates - Fixes #5485.

Signed-off-by: Ilya Kasnacheev <il...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4496c77
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4496c77
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4496c77

Branch: refs/heads/ignite-10044
Commit: d4496c77090593125a36454ef74e77b1d0240cae
Parents: 2a09d54
Author: Roman Guseinov <gr...@gmail.com>
Authored: Thu Nov 29 16:40:46 2018 +0300
Committer: Ilya Kasnacheev <il...@gmail.com>
Committed: Thu Nov 29 16:40:46 2018 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   3 +
 .../tcp/internal/TcpDiscoveryNode.java          |   9 +-
 ...ClientDiscoverySpiCoordinatorChangeTest.java | 121 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   2 +
 4 files changed, 134 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4c74706..ce69e78 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -5232,6 +5232,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                             if (clientNodeIds.contains(clientNode.id()))
                                 clientNode.clientAliveTime(spi.clientFailureDetectionTimeout());
                             else {
+                                if (clientNode.clientAliveTime() == 0L)
+                                    clientNode.clientAliveTime(spi.clientFailureDetectionTimeout());
+
                                 boolean aliveCheck = clientNode.isClientAlive();
 
                                 if (!aliveCheck && isLocalNodeCoordinator()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 13d1006..763a678 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -124,7 +124,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite
 
     /** Alive check time (used by clients). */
     @GridToStringExclude
-    private transient long aliveCheckTime;
+    private transient volatile long aliveCheckTime;
 
     /** Client router node ID. */
     @GridToStringExclude
@@ -500,6 +500,13 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite
     }
 
     /**
+     * @return Client alive check time.
+     */
+    public long clientAliveTime() {
+        return aliveCheckTime;
+    }
+
+    /**
      * @return Client router node ID.
      */
     public UUID clientRouterNodeId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java
new file mode 100644
index 0000000..0939e63
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * This class tests that a client is able to connect to another server node without leaving the cluster.
+ */
+public class TcpClientDiscoverySpiCoordinatorChangeTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Checks that a client node doesn't fail because of coordinator change.
+     *
+     * @throws Exception If test fails.
+     */
+    public void testClientNotFailed() throws Exception {
+        TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+        // Start server A.
+        Ignite srvA = startNode("server-a", ipFinder, false);
+
+        // Start the client.
+        Ignite client = startNode("client", ipFinder, true);
+
+        AtomicBoolean clientReconnectState = getClientReconnectState(client);
+
+        // Start server B.
+        Ignite srvB = startNode("server-b", ipFinder, false);
+
+        // Stop server A.
+        srvA.close();
+
+        // Will throw an exception if the client is disconnected.
+        client.getOrCreateCache("CACHE-NAME");
+
+        // Check that the client didn't disconnect/reconnect quickly.
+        assertFalse("Client node was failed and reconnected to the cluster.", clientReconnectState.get());
+
+        // Stop the client.
+        client.close();
+
+        // Stop server B.
+        srvB.close();
+    }
+
+    /**
+     * @param instanceName Instance name.
+     * @param ipFinder IP-finder.
+     * @param clientMode Client mode flag.
+     * @return Started node.
+     * @throws Exception If a node was not started.
+     */
+    private Ignite startNode(String instanceName, TcpDiscoveryIpFinder ipFinder,  boolean clientMode) throws Exception {
+        IgniteConfiguration cfg = getConfiguration(instanceName)
+            .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder))
+            .setClientMode(clientMode);
+
+        return Ignition.start(cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+        return super.getConfiguration(instanceName)
+            .setMetricsUpdateFrequency(Integer.MAX_VALUE)
+            .setClientFailureDetectionTimeout(Integer.MAX_VALUE)
+            .setFailureDetectionTimeout(Integer.MAX_VALUE);
+    }
+
+    /**
+     * @param ignite Client node.
+     * @return Client reconnect state.
+     */
+    private AtomicBoolean getClientReconnectState(Ignite ignite) {
+        final AtomicBoolean reconnectState = new AtomicBoolean(false);
+
+        ignite.events().localListen(
+            new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    if (evt.type() == EventType.EVT_CLIENT_NODE_RECONNECTED)
+                        reconnectState.set(true);
+
+                    return true;
+                }
+            },
+            EventType.EVT_CLIENT_NODE_RECONNECTED
+        );
+
+        return reconnectState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 80f093d..3ef1087 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiCoordinatorChangeTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiSelfTest;
@@ -100,6 +101,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
         suite.addTest(new TestSuite(LongClientConnectToClusterTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
+        suite.addTest(new TestSuite(TcpClientDiscoverySpiCoordinatorChangeTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoveryUnresolvedHostTest.class));


[3/4] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-10044

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-10044


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a2842079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a2842079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a2842079

Branch: refs/heads/ignite-10044
Commit: a2842079eaf5a6be1b909bb5b6df6b952f45b4d1
Parents: e0657d7 d4496c7
Author: sboikov <sb...@apache.org>
Authored: Thu Nov 29 17:31:58 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Thu Nov 29 17:31:58 2018 +0300

----------------------------------------------------------------------
 .../wal/FileWriteAheadLogManager.java           |  34 ++--
 .../wal/aware/SegmentArchivedStorage.java       |  16 ++
 .../persistence/wal/aware/SegmentAware.java     |  13 +-
 .../wal/aware/SegmentCompressStorage.java       |   7 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   3 +
 .../tcp/internal/TcpDiscoveryNode.java          |   9 +-
 .../db/wal/WalCompactionAfterRestartTest.java   | 161 +++++++++++++++++++
 .../persistence/wal/aware/SegmentAwareTest.java |  10 +-
 ...ClientDiscoverySpiCoordinatorChangeTest.java | 121 ++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   2 +
 10 files changed, 345 insertions(+), 31 deletions(-)
----------------------------------------------------------------------