You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Vladimir Steshin (Jira)" <ji...@apache.org> on 2022/10/03 12:47:00 UTC

[jira] [Comment Edited] (IGNITE-17369) Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.

    [ https://issues.apache.org/jira/browse/IGNITE-17369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17612278#comment-17612278 ] 

Vladimir Steshin edited comment on IGNITE-17369 at 10/3/22 12:46 PM:
---------------------------------------------------------------------

Snapshot can begin work with different state of kin partitions. The shapshot process waits for the datastreamer futures. (GridCacheMvccManager.addDataStreamerFuture()). The problem is that these futures are created separately and concurrently on primary and backups nodes by IsolatedUpdater. As result, at the checkpoint some backups might be written without the primaries. And opposite. There are no updates accepted during checkpoint. Late streamer updates is not written to snapshoting partitions.

Solutions:

1) V1 (PR 10285). 
PR brings watching DataStreamer futures in snapshot process. The futures are created before writing streamer batch on any node. We cannot relay on it as on final and consistent write for current/latest streamer batch or certain record/entry. But we know that datastreamer is in progress at the checkpoint and that it is on pause. We can invalidate snapshot at this moment.

2) V2 (PR 10286).
IsolatedUpdater could just notify snapshot process, if exists, that concurrent inconsistent update is on. A notification of at least one entry on any node wound be enough. Should work in practice. In theory the solution is not resilent. On streamer batch could've been entirely written before snapshot. Second batch after. First batch writes partition on primaries or backups. Second writes the rest. Snapshot is inconsistent.

3) V2 (PR 10284).
We could mark that datastreamer is on on any first streamer batch received. And unmark somehow later. If datastreamer is marked as active, the snapshot process could check this mark. Since the mark is set before writting data, it is set before the datastreamer future which is waiting for the snapshot process.

The problem is how to close such mark. When node left? Node can live forever. Send special closing request? The streamer node can do not close streamer at all. Meaning no close() is invoked. Moreoever, datastreamer work through CommunicatioSPI. Which doesn't guarantee delivery. We can be sure the closing request is delivered and streamer is unmarked. On closing requests, a rebalance can happen. Should be processed too. Or datastreamer can be canceled. Looks like wee need a discovery closing message. Much simpler and reliable.

This solution looks hazardous.


was (Author: vladsz83):
Snapshot can begin work with different state of kin partitions. The shapshot process waits for the datastreamer futures. (GridCacheMvccManager.addDataStreamerFuture()). The problem is that these futures are created separately and concurrently on primary and backups nodes by IsolatedUpdater. As result, at the checkpoint some backups might be written without the primaries. And opposite. There are no updates accepted during checkpoint. Late streamer updates is not written to snapshoting partitions.

1) V1 (10285). 
PR brings watching DataStreamer futures in snapshot process. The futures are created before writing streamer batch on any node. We cannot relay on it as on final and consistent write for current/latest streamer batch or certain record/entry. But we know that datastreamer is in progress at the checkpoint and that it is on pause. We can invalidate snapshot at this moment.

2) V2 (10286).
IsolatedUpdater could just notify snapshot process, if exists, that concurrent inconsistent update is on. A notification of at least one entry on any node wound be enough. Should work in practice.
In theory the solution is not resilent. On streamer batch could've been entirely written before snapshot. Second batch after. First batch writes partition on primaries or backups. Second writes the rest. Snapshot is inconsistent.

3) V2 (10284).
We could mark that datastreamer is on on any first streamer batch received. And unmark somehow later. If datastreamer is marked
as active, the snapshot process could check this mark. Since the mark is set before writting data, it is set before the datastreamer future which is waiting for the snapshot process.

The problem is how to close such mark. When node left? Node can live forever. Send special closing request? The streamer node

can do not close streamer at all. Meaning no close() is invoked. Moreoever, datastreamer work through CommunicatioSPI. Which doesn't guarantee delivery. We can be sure the closing request is delivered and streamer is unmarked. On closing requests, a rebalance can happen. Should be processed too. Or datastreamer can be canceled. Looks like wee need a discovery closing message. Much simpler and reliable.

This solution looks hazardous.

> Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.
> -----------------------------------------------------------------------------
>
>                 Key: IGNITE-17369
>                 URL: https://issues.apache.org/jira/browse/IGNITE-17369
>             Project: Ignite
>          Issue Type: Bug
>            Reporter: Vladimir Steshin
>            Assignee: Vladimir Steshin
>            Priority: Major
>              Labels: ise, ise.lts
>         Attachments: IgniteClusterShanpshotStreamerTest.java
>
>
> Ignite fails to restore snapshot created under streamed load:
> {code:java}
> Conflict partition: PartitionKeyV2 [grpId=109386747, grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
> Conflict partition: PartitionKeyV2 [grpId=109386747, grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
> {code}
> Test (attached):
> {code:java}
>     public void testClusterSnapshotConsistencyWithStreamer() throws Exception {
>         int grids = 2;
>         CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
>         AtomicBoolean stopLoading = new AtomicBoolean(false);
>         dfltCacheCfg = null;
>         Class.forName("org.apache.ignite.IgniteJdbcDriver");
>         String tableName = "TEST_TBL1";
>         startGrids(grids);
>         grid(0).cluster().state(ACTIVE);
>         IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, stopLoading, loadNumberBeforeSnapshot);
>         loadNumberBeforeSnapshot.await();
>         grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
>         stopLoading.set(true);
>         load1.get();
>         grid(0).cache("SQL_PUBLIC_" + tableName).destroy();
>         grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, F.asList("SQL_PUBLIC_TEST_TBL1")).get();
>     }
>     /** */
>     private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, int backups, boolean streaming, AtomicBoolean stop,
>         CountDownLatch startSnp) {
>         return GridTestUtils.runMultiThreadedAsync(() -> {
>             if(useCache) {
>                 String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();
>                 IgniteCache<Integer, Object> cache = grid(0)
>                     .createCache(new CacheConfiguration<Integer, Object>(cacheName).setBackups(backups)
>                         .setCacheMode(CacheMode.REPLICATED));
>                 try (IgniteDataStreamer<Integer, Object> ds = grid(0).dataStreamer(cacheName)) {
>                     for (int i = 0; !stop.get(); ++i) {
>                         if (streaming)
>                             ds.addData(i, new Account(i, i - 1));
>                         else
>                             cache.put(i, new Account(i, i - 1));
>                         if (startSnp.getCount() > 0)
>                             startSnp.countDown();
>                         Thread.yield();
>                     }
>                 }
>             } else {
>                 try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
>                     createTable(conn, tblName, backups);
>                     try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + tblName +
>                         "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {
>                         if (streaming)
>                             conn.prepareStatement("SET STREAMING ON;").execute();
>                         int leftLimit = 97; // letter 'a'
>                         int rightLimit = 122; // letter'z'
>                         int targetStringLength = 15;
>                         Random rand = new Random();
> //
>                         for (int i = 0; !stop.get(); ++i) {
>                             int orgid = rand.ints(1, 0, 5).findFirst().getAsInt();
>                             String val = rand.ints(leftLimit, rightLimit + 1).limit(targetStringLength)
>                                 .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
>                                 .toString();
>                             stmt.setInt(1, i);
>                             stmt.setString(2, val);
>                             stmt.setInt(3, orgid);
>                             stmt.setInt(4, 0);
>                             stmt.executeUpdate();
>                             if (startSnp.getCount() > 0)
>                                 startSnp.countDown();
>                             Thread.yield();
>                         }
>                     }
>                 }
>                 catch (Exception e) {
>                     while (startSnp.getCount() > 0)
>                         startSnp.countDown();
>                     throw new IgniteException("Unable to load.", e);
>                 }
>             }
>         }, 1, "load-thread-" + tblName);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)