You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2018/11/29 14:42:26 UTC
[1/4] ignite git commit: IGNITE-10079 Fiexd WAL segments compression
bug when FileWriteAheadLogManager return invalid lastCompactedSegment
Repository: ignite
Updated Branches:
refs/heads/ignite-10044 e0657d718 -> 94ec2f892
IGNITE-10079 Fiexd WAL segments compression bug when FileWriteAheadLogManager return invalid lastCompactedSegment
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a09d546
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a09d546
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a09d546
Branch: refs/heads/ignite-10044
Commit: 2a09d54625c62acbb05ee71fb98c513b5e2c3183
Parents: 7920646
Author: Andrey Kuznetsov <st...@gmail.com>
Authored: Thu Nov 29 14:54:43 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Thu Nov 29 14:54:43 2018 +0300
----------------------------------------------------------------------
.../wal/FileWriteAheadLogManager.java | 34 ++--
.../wal/aware/SegmentArchivedStorage.java | 16 ++
.../persistence/wal/aware/SegmentAware.java | 13 +-
.../wal/aware/SegmentCompressStorage.java | 7 +-
.../db/wal/WalCompactionAfterRestartTest.java | 161 +++++++++++++++++++
.../persistence/wal/aware/SegmentAwareTest.java | 10 +-
6 files changed, 211 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index b56b64f..fad1ec1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -1932,7 +1932,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0));
- return reserved ? segmentToCompress : -1;
+ if (reserved)
+ return segmentToCompress;
+ else {
+ segmentAware.onSegmentCompressed(segmentToCompress);
+
+ return -1;
+ }
}
/** {@inheritDoc} */
@@ -1946,9 +1952,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
long segIdx = -1L;
try {
- segIdx = tryReserveNextSegmentOrWait();
-
- if (segIdx <= segmentAware.lastCompressedIdx())
+ if ((segIdx = tryReserveNextSegmentOrWait()) == -1)
continue;
deleteObsoleteRawSegments();
@@ -1967,21 +1971,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Files.move(tmpZip.toPath(), zip.toPath());
- if (mode != WALMode.NONE) {
- try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
- f0.force();
- }
-
- if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) {
- evt.record(new WalSegmentCompactedEvent(
- cctx.localNode(),
- segIdx,
- zip.getAbsoluteFile())
- );
- }
+ try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
+ f0.force();
}
segmentAware.onSegmentCompressed(segIdx);
+
+ if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) {
+ evt.record(new WalSegmentCompactedEvent(
+ cctx.localNode(),
+ segIdx,
+ zip.getAbsoluteFile())
+ );
+ }
}
catch (IgniteInterruptedCheckedException ignore) {
Thread.currentThread().interrupt();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
index c526ae1..e31628f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -34,6 +34,8 @@ class SegmentArchivedStorage extends SegmentObservable {
* no segments archived.
*/
private volatile long lastAbsArchivedIdx = -1;
+ /** Latest truncated segment. */
+ private volatile long lastTruncatedArchiveIdx = -1;
/**
* @param segmentLockStorage Protects WAL work segments from moving.
@@ -136,4 +138,18 @@ class SegmentArchivedStorage extends SegmentObservable {
private synchronized void onSegmentUnlocked(long segmentId) {
notifyAll();
}
+
+ /**
+ * @param lastTruncatedArchiveIdx Last truncated segment.
+ */
+ void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
+ this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
+ }
+
+ /**
+ * @return Last truncated segment.
+ */
+ long lastTruncatedArchiveIdx() {
+ return lastTruncatedArchiveIdx;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index e46d93f..a14f0ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -27,8 +27,6 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.
* Holder of actual information of latest manipulation on WAL segments.
*/
public class SegmentAware {
- /** Latest truncated segment. */
- private volatile long lastTruncatedArchiveIdx = -1L;
/** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */
private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage();
/** Lock on segment protects from archiving segment. */
@@ -106,7 +104,12 @@ public class SegmentAware {
* there's no segment to archive right now.
*/
public long waitNextSegmentToCompress() throws IgniteInterruptedCheckedException {
- return Math.max(segmentCompressStorage.nextSegmentToCompressOrWait(), lastTruncatedArchiveIdx + 1);
+ long idx;
+
+ while ((idx = segmentCompressStorage.nextSegmentToCompressOrWait()) <= lastTruncatedArchiveIdx())
+ onSegmentCompressed(idx);
+
+ return idx;
}
/**
@@ -152,14 +155,14 @@ public class SegmentAware {
* @param lastTruncatedArchiveIdx Last truncated segment;
*/
public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
- this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
+ segmentArchivedStorage.lastTruncatedArchiveIdx(lastTruncatedArchiveIdx);
}
/**
* @return Last truncated segment.
*/
public long lastTruncatedArchiveIdx() {
- return lastTruncatedArchiveIdx;
+ return segmentArchivedStorage.lastTruncatedArchiveIdx();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 174fb46..95d4f4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -94,6 +94,9 @@ public class SegmentCompressStorage {
this.lastCompressedIdx = Math.min(lastMaxCompressedIdx, compressingSegments.get(0) - 1);
else
this.lastCompressedIdx = lastMaxCompressedIdx;
+
+ if (compressedIdx > lastEnqueuedToCompressIdx)
+ lastEnqueuedToCompressIdx = compressedIdx;
}
/**
@@ -120,9 +123,11 @@ public class SegmentCompressStorage {
Long idx = segmentsToCompress.poll();
+ assert idx != null;
+
compressingSegments.add(idx);
- return idx == null ? -1L : idx;
+ return idx;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
new file mode 100644
index 0000000..3685fe7
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionAfterRestartTest.java
@@ -0,0 +1,161 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.WalSegmentCompactedEvent;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
+
+/** */
+public class WalCompactionAfterRestartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(name);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(200L * 1024 * 1024))
+ .setWalMode(WALMode.LOG_ONLY)
+ .setWalSegmentSize(512 * 1024)
+ .setWalCompactionEnabled(true)
+ .setMaxWalArchiveSize(2 * 512 * 1024)
+ );
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(DEFAULT_CACHE_NAME);
+ ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+ ccfg.setBackups(0);
+
+ cfg.setCacheConfiguration(ccfg);
+ cfg.setConsistentId(name);
+
+ cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_COMPACTED);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void test() throws Exception {
+ IgniteEx ig = startGrid(0);
+
+ ig.cluster().active(true);
+
+ doCachePuts(ig, 10_000);
+
+ ig.cluster().active(false);
+
+ stopGrid(0);
+
+ IgniteEx ig0 = startGrid(0);
+
+ ig0.cluster().active(true);
+
+ List<IgniteBiTuple<Long, Long>> discrepancies = Collections.synchronizedList(new ArrayList<>());
+
+ ig0.events().localListen(e -> {
+ long evtSegIdx = ((WalSegmentCompactedEvent)e).getAbsWalSegmentIdx();
+ long lastCompactedIdx = ig0.context().cache().context().wal().lastCompactedSegment();
+
+ if (lastCompactedIdx < 0 || lastCompactedIdx > evtSegIdx)
+ discrepancies.add(F.t(evtSegIdx, lastCompactedIdx));
+
+ return true;
+ }, EVT_WAL_SEGMENT_COMPACTED);
+
+ doCachePuts(ig0, 5_000);
+
+ stopGrid(0);
+
+ if (!discrepancies.isEmpty()) {
+ fail("Discrepancies (EVT_WAL_SEGMENT_COMPACTED index vs. lastCompactedSegment):" + System.lineSeparator() +
+ discrepancies.stream()
+ .map(t -> String.format("%d <-> %d", t.get1(), t.get2()))
+ .collect(Collectors.joining(System.lineSeparator())));
+ }
+ }
+
+ /** */
+ private void doCachePuts(IgniteEx ig, long millis) throws IgniteCheckedException {
+ IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<Long> putFut = GridTestUtils.runMultiThreadedAsync(() -> {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stop.get())
+ cache.put(rnd.nextInt(), "Ignite".getBytes());
+ }, 4, "cache-filler");
+
+ U.sleep(millis);
+
+ stop.set(true);
+
+ putFut.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2a09d546/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 0869356..d651e01 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -438,18 +438,12 @@ public class SegmentAwareTest extends TestCase {
* Next segment for compress based on truncated archive idx.
*/
public void testCorrectCalculateNextCompressSegment() throws IgniteCheckedException, InterruptedException {
- //given: thread which awaited segment.
SegmentAware aware = new SegmentAware(10, true);
- aware.onSegmentCompressed(5);
aware.setLastArchivedAbsoluteIndex(6);
- aware.lastTruncatedArchiveIdx(7);
-
- //when:
- long segmentToCompress = aware.waitNextSegmentToCompress();
- //then: segment to compress greater than truncated archive idx
- assertEquals(8, segmentToCompress);
+ for (int exp = 0; exp <= 6; exp++)
+ assertEquals(exp, aware.waitNextSegmentToCompress());
}
/**
[4/4] ignite git commit: 10044
Posted by sb...@apache.org.
10044
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/94ec2f89
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/94ec2f89
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/94ec2f89
Branch: refs/heads/ignite-10044
Commit: 94ec2f89255aa15fd7860f390167239d7d2f8f37
Parents: a284207
Author: sboikov <sb...@apache.org>
Authored: Thu Nov 29 17:42:13 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Thu Nov 29 17:42:13 2018 +0300
----------------------------------------------------------------------
.../cache/GridCachePartitionExchangeManager.java | 2 +-
.../dht/topology/GridClientPartitionTopology.java | 12 +++++++++++-
.../dht/topology/GridDhtPartitionTopologyImpl.java | 2 +-
3 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/94ec2f89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6dad367..38cbe46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -854,7 +854,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
aff.partitions());
GridClientPartitionTopology old = clientTops.putIfAbsent(grpId,
- top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), affKey));
+ top = new GridClientPartitionTopology(cctx, discoCache, grpId, aff.partitions(), ccfg.getPartitionLossPolicy(), affKey));
return old != null ? old : top;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/94ec2f89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index feca1e7..02faa51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -127,6 +128,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
private volatile Map<Integer, Long> globalPartSizes;
/** */
+ private final PartitionLossPolicy partLossPlc;
+
+ /** */
private TreeSet<Integer> lostParts;
/**
@@ -134,6 +138,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
* @param discoCache Discovery data cache.
* @param grpId Group ID.
* @param parts Number of partitions in the group.
+ * @param partLossPlc Partition loss policy.
* @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
@@ -141,6 +146,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
DiscoCache discoCache,
int grpId,
int parts,
+ PartitionLossPolicy partLossPlc,
Object similarAffKey
) {
this.cctx = cctx;
@@ -148,6 +154,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
this.grpId = grpId;
this.similarAffKey = similarAffKey;
this.parts = parts;
+ this.partLossPlc = partLossPlc;
topVer = AffinityTopologyVersion.NONE;
@@ -944,7 +951,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
if (cur == null || !cur.equals(parts))
changed = true;
- if (lostParts != null) {
+ if (lostParts != null && partLossPlc != PartitionLossPolicy.IGNORE) {
for (Integer lostPart : lostParts) {
GridDhtPartitionState state0 = parts.get(lostPart);
@@ -1005,6 +1012,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, DiscoveryEvent discoEvt) {
+ if (partLossPlc == PartitionLossPolicy.IGNORE)
+ return false;
+
lock.writeLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/94ec2f89/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index d518467..cc8a198 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -1807,7 +1807,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (cur == null || !cur.equals(parts))
changed = true;
- if (lostParts != null) {
+ if (lostParts != null && grp.config().getPartitionLossPolicy() != PartitionLossPolicy.IGNORE) {
for (Integer lostPart : lostParts) {
GridDhtPartitionState state0 = parts.get(lostPart);
[2/4] ignite git commit: IGNITE-10354 Failing client node due to not
receiving metrics updates - Fixes #5485.
Posted by sb...@apache.org.
IGNITE-10354 Failing client node due to not receiving metrics updates - Fixes #5485.
Signed-off-by: Ilya Kasnacheev <il...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4496c77
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4496c77
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4496c77
Branch: refs/heads/ignite-10044
Commit: d4496c77090593125a36454ef74e77b1d0240cae
Parents: 2a09d54
Author: Roman Guseinov <gr...@gmail.com>
Authored: Thu Nov 29 16:40:46 2018 +0300
Committer: Ilya Kasnacheev <il...@gmail.com>
Committed: Thu Nov 29 16:40:46 2018 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 3 +
.../tcp/internal/TcpDiscoveryNode.java | 9 +-
...ClientDiscoverySpiCoordinatorChangeTest.java | 121 +++++++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
4 files changed, 134 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4c74706..ce69e78 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -5232,6 +5232,9 @@ class ServerImpl extends TcpDiscoveryImpl {
if (clientNodeIds.contains(clientNode.id()))
clientNode.clientAliveTime(spi.clientFailureDetectionTimeout());
else {
+ if (clientNode.clientAliveTime() == 0L)
+ clientNode.clientAliveTime(spi.clientFailureDetectionTimeout());
+
boolean aliveCheck = clientNode.isClientAlive();
if (!aliveCheck && isLocalNodeCoordinator()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 13d1006..763a678 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -124,7 +124,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite
/** Alive check time (used by clients). */
@GridToStringExclude
- private transient long aliveCheckTime;
+ private transient volatile long aliveCheckTime;
/** Client router node ID. */
@GridToStringExclude
@@ -500,6 +500,13 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Ignite
}
/**
+ * @return Client alive check time.
+ */
+ public long clientAliveTime() {
+ return aliveCheckTime;
+ }
+
+ /**
* @return Client router node ID.
*/
public UUID clientRouterNodeId() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java
new file mode 100644
index 0000000..0939e63
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiCoordinatorChangeTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * This class tests that a client is able to connect to another server node without leaving the cluster.
+ */
+public class TcpClientDiscoverySpiCoordinatorChangeTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Checks that a client node doesn't fail because of coordinator change.
+ *
+ * @throws Exception If test fails.
+ */
+ public void testClientNotFailed() throws Exception {
+ TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ // Start server A.
+ Ignite srvA = startNode("server-a", ipFinder, false);
+
+ // Start the client.
+ Ignite client = startNode("client", ipFinder, true);
+
+ AtomicBoolean clientReconnectState = getClientReconnectState(client);
+
+ // Start server B.
+ Ignite srvB = startNode("server-b", ipFinder, false);
+
+ // Stop server A.
+ srvA.close();
+
+ // Will throw an exception if the client is disconnected.
+ client.getOrCreateCache("CACHE-NAME");
+
+ // Check that the client didn't disconnect/reconnect quickly.
+ assertFalse("Client node was failed and reconnected to the cluster.", clientReconnectState.get());
+
+ // Stop the client.
+ client.close();
+
+ // Stop server B.
+ srvB.close();
+ }
+
+ /**
+ * @param instanceName Instance name.
+ * @param ipFinder IP-finder.
+ * @param clientMode Client mode flag.
+ * @return Started node.
+ * @throws Exception If a node was not started.
+ */
+ private Ignite startNode(String instanceName, TcpDiscoveryIpFinder ipFinder, boolean clientMode) throws Exception {
+ IgniteConfiguration cfg = getConfiguration(instanceName)
+ .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder))
+ .setClientMode(clientMode);
+
+ return Ignition.start(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+ return super.getConfiguration(instanceName)
+ .setMetricsUpdateFrequency(Integer.MAX_VALUE)
+ .setClientFailureDetectionTimeout(Integer.MAX_VALUE)
+ .setFailureDetectionTimeout(Integer.MAX_VALUE);
+ }
+
+ /**
+ * @param ignite Client node.
+ * @return Client reconnect state.
+ */
+ private AtomicBoolean getClientReconnectState(Ignite ignite) {
+ final AtomicBoolean reconnectState = new AtomicBoolean(false);
+
+ ignite.events().localListen(
+ new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EventType.EVT_CLIENT_NODE_RECONNECTED)
+ reconnectState.set(true);
+
+ return true;
+ }
+ },
+ EventType.EVT_CLIENT_NODE_RECONNECTED
+ );
+
+ return reconnectState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4496c77/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 80f093d..3ef1087 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -28,6 +28,7 @@ import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
+import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiCoordinatorChangeTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiSelfTest;
@@ -100,6 +101,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
suite.addTest(new TestSuite(LongClientConnectToClusterTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
+ suite.addTest(new TestSuite(TcpClientDiscoverySpiCoordinatorChangeTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryUnresolvedHostTest.class));
[3/4] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-10044
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-10044
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a2842079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a2842079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a2842079
Branch: refs/heads/ignite-10044
Commit: a2842079eaf5a6be1b909bb5b6df6b952f45b4d1
Parents: e0657d7 d4496c7
Author: sboikov <sb...@apache.org>
Authored: Thu Nov 29 17:31:58 2018 +0300
Committer: sboikov <sb...@apache.org>
Committed: Thu Nov 29 17:31:58 2018 +0300
----------------------------------------------------------------------
.../wal/FileWriteAheadLogManager.java | 34 ++--
.../wal/aware/SegmentArchivedStorage.java | 16 ++
.../persistence/wal/aware/SegmentAware.java | 13 +-
.../wal/aware/SegmentCompressStorage.java | 7 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 3 +
.../tcp/internal/TcpDiscoveryNode.java | 9 +-
.../db/wal/WalCompactionAfterRestartTest.java | 161 +++++++++++++++++++
.../persistence/wal/aware/SegmentAwareTest.java | 10 +-
...ClientDiscoverySpiCoordinatorChangeTest.java | 121 ++++++++++++++
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +
10 files changed, 345 insertions(+), 31 deletions(-)
----------------------------------------------------------------------