You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Nikita Amelchev (Jira)" <ji...@apache.org> on 2022/11/07 09:02:00 UTC
[jira] [Updated] (IGNITE-17369) Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.
[ https://issues.apache.org/jira/browse/IGNITE-17369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nikita Amelchev updated IGNITE-17369:
-------------------------------------
Fix Version/s: 2.15
> Snapshot is inconsistent under streamed loading with 'allowOverwrite==false'.
> -----------------------------------------------------------------------------
>
> Key: IGNITE-17369
> URL: https://issues.apache.org/jira/browse/IGNITE-17369
> Project: Ignite
> Issue Type: Bug
> Reporter: Vladimir Steshin
> Assignee: Vladimir Steshin
> Priority: Major
> Labels: ise, ise.lts
> Fix For: 2.15
>
> Attachments: IgniteClusterShanpshotStreamerTest.java
>
> Time Spent: 4h
> Remaining Estimate: 0h
>
> Ignite fails to restore snapshot created under streamed load:
> {code:java}
> Conflict partition: PartitionKeyV2 [grpId=109386747, grpName=SQL_PUBLIC_TEST_TBL1, partId=148]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=29, partitionState=OWNING, size=29, partHash=827765854], PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, updateCntr=9, partitionState=OWNING, size=9, partHash=-1515069105]]
> Conflict partition: PartitionKeyV2 [grpId=109386747, grpName=SQL_PUBLIC_TEST_TBL1, partId=146]
> Partition instances: [PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest0, updateCntr=28, partitionState=OWNING, size=28, partHash=1497908810], PartitionHashRecordV2 [isPrimary=false, consistentId=snapshot.IgniteClusterShanpshotStreamerTest1, updateCntr=5, partitionState=OWNING, size=5, partHash=821195757]]
> {code}
> Test (attached):
> {code:java}
> public void testClusterSnapshotConsistencyWithStreamer() throws Exception {
> int grids = 2;
> CountDownLatch loadNumberBeforeSnapshot = new CountDownLatch(60_000);
> AtomicBoolean stopLoading = new AtomicBoolean(false);
> dfltCacheCfg = null;
> Class.forName("org.apache.ignite.IgniteJdbcDriver");
> String tableName = "TEST_TBL1";
> startGrids(grids);
> grid(0).cluster().state(ACTIVE);
> IgniteInternalFuture<?> load1 = runLoad(tableName, false, 1, true, stopLoading, loadNumberBeforeSnapshot);
> loadNumberBeforeSnapshot.await();
> grid(0).snapshot().createSnapshot(SNAPSHOT_NAME).get();
> stopLoading.set(true);
> load1.get();
> grid(0).cache("SQL_PUBLIC_" + tableName).destroy();
> grid(0).snapshot().restoreSnapshot(SNAPSHOT_NAME, F.asList("SQL_PUBLIC_TEST_TBL1")).get();
> }
> /** */
> private IgniteInternalFuture<?> runLoad(String tblName, boolean useCache, int backups, boolean streaming, AtomicBoolean stop,
> CountDownLatch startSnp) {
> return GridTestUtils.runMultiThreadedAsync(() -> {
> if(useCache) {
> String cacheName = "SQL_PUBLIC_" + tblName.toUpperCase();
> IgniteCache<Integer, Object> cache = grid(0)
> .createCache(new CacheConfiguration<Integer, Object>(cacheName).setBackups(backups)
> .setCacheMode(CacheMode.REPLICATED));
> try (IgniteDataStreamer<Integer, Object> ds = grid(0).dataStreamer(cacheName)) {
> for (int i = 0; !stop.get(); ++i) {
> if (streaming)
> ds.addData(i, new Account(i, i - 1));
> else
> cache.put(i, new Account(i, i - 1));
> if (startSnp.getCount() > 0)
> startSnp.countDown();
> Thread.yield();
> }
> }
> } else {
> try (Connection conn = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1/")) {
> createTable(conn, tblName, backups);
> try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO " + tblName +
> "(id, name, orgid, dep) VALUES(?, ?, ?, ?)")) {
> if (streaming)
> conn.prepareStatement("SET STREAMING ON;").execute();
> int leftLimit = 97; // letter 'a'
> int rightLimit = 122; // letter'z'
> int targetStringLength = 15;
> Random rand = new Random();
> //
> for (int i = 0; !stop.get(); ++i) {
> int orgid = rand.ints(1, 0, 5).findFirst().getAsInt();
> String val = rand.ints(leftLimit, rightLimit + 1).limit(targetStringLength)
> .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
> .toString();
> stmt.setInt(1, i);
> stmt.setString(2, val);
> stmt.setInt(3, orgid);
> stmt.setInt(4, 0);
> stmt.executeUpdate();
> if (startSnp.getCount() > 0)
> startSnp.countDown();
> Thread.yield();
> }
> }
> }
> catch (Exception e) {
> while (startSnp.getCount() > 0)
> startSnp.countDown();
> throw new IgniteException("Unable to load.", e);
> }
> }
> }, 1, "load-thread-" + tblName);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)