You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2022/11/03 18:52:41 UTC

[GitHub] [ignite-3] tkalkirill opened a new pull request, #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

tkalkirill opened a new pull request, #1313:
URL: https://github.com/apache/ignite-3/pull/1313

   https://issues.apache.org/jira/browse/IGNITE-18022


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1030488686


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageOnRebalance.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Decorator for {@link TxStateStorage} on full rebalance.
+ *
+ * <p>Allows you to use methods to change data, throws {@link IllegalStateException} when reading data.
+ */
+public class TxStateStorageOnRebalance implements TxStateStorage {
+    private static final String ERROR_MESSAGE = "Transaction state storage is in full rebalancing, data reading is not available.";
+
+    private final TxStateStorage delegate;
+
+    /**
+     * Constructor.
+     *
+     * @param delegate Delegate.
+     */
+    public TxStateStorageOnRebalance(TxStateStorage delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public TxMeta get(UUID txId) {
+        throw createDataWriteOnlyException();
+    }
+
+    @Override
+    public void put(UUID txId, TxMeta txMeta) {
+        delegate.put(txId, txMeta);
+    }
+
+    @Override
+    public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex) {
+        return delegate.compareAndSet(txId, txStateExpected, txMeta, commandIndex);
+    }
+
+    @Override
+    public void remove(UUID txId) {
+        delegate.remove(txId);
+    }
+
+    @Override
+    public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
+        throw createDataWriteOnlyException();
+    }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return delegate.flush();
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return delegate.lastAppliedIndex();
+    }
+
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) {
+        delegate.lastAppliedIndex(lastAppliedIndex);
+    }
+
+    @Override
+    public long persistedIndex() {
+        return delegate.persistedIndex();
+    }
+
+    @Override
+    public void destroy() {
+        delegate.destroy();
+    }
+
+    @Override
+    public void close() throws Exception {
+        delegate.close();
+    }
+
+    private IllegalStateException createDataWriteOnlyException() {
+        return new IllegalStateException(ERROR_MESSAGE);

Review Comment:
   Changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1013995639


##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateTableStorageTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract class for {@link TxStateTableStorage} testing.
+ */
+public abstract class AbstractTxStateTableStorageTest {
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_ID_1 = 1;
+
+    private TxStateTableStorage tableStorage;
+
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(TxStateTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Test
+    public void testStartRebalance() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalance(PARTITION_ID_1));
+
+        UUID txId = UUID.randomUUID();
+        TxMeta txMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        assertThat(storage, instanceOf(TxStateStorageDecorator.class));
+
+        storage.put(txId, txMeta);
+
+        storage.lastAppliedIndex(100);
+
+        storage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage0 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertNotNull(newStorage0);
+
+        assertSame(storage, newStorage0);
+
+        assertEquals(0L, newStorage0.lastAppliedIndex());
+        assertEquals(0L, newStorage0.persistedIndex());
+
+        assertThat(getAll(newStorage0.scan()), empty());
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);

Review Comment:
   Sounds reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1013982218


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageOnRebalance.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Decorator for {@link TxStateStorage} on full rebalance.
+ *
+ * <p>Allows you to use methods to change data, throws {@link IllegalStateException} when reading data.
+ */
+public class TxStateStorageOnRebalance implements TxStateStorage {
+    private static final String ERROR_MESSAGE = "Transaction state storage is in full rebalancing, data reading is not available.";
+
+    private final TxStateStorage delegate;
+
+    /**
+     * Constructor.
+     *
+     * @param delegate Delegate.
+     */
+    public TxStateStorageOnRebalance(TxStateStorage delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public TxMeta get(UUID txId) {
+        throw createDataWriteOnlyException();

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill closed pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill closed pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver
URL: https://github.com/apache/ignite-3/pull/1313


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1013983119


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageOnRebalance.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Decorator for {@link TxStateStorage} on full rebalance.
+ *
+ * <p>Allows you to use methods to change data, throws {@link IllegalStateException} when reading data.
+ */
+public class TxStateStorageOnRebalance implements TxStateStorage {
+    private static final String ERROR_MESSAGE = "Transaction state storage is in full rebalancing, data reading is not available.";
+
+    private final TxStateStorage delegate;
+
+    /**
+     * Constructor.
+     *
+     * @param delegate Delegate.
+     */
+    public TxStateStorageOnRebalance(TxStateStorage delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public TxMeta get(UUID txId) {
+        throw createDataWriteOnlyException();
+    }
+
+    @Override
+    public void put(UUID txId, TxMeta txMeta) {
+        delegate.put(txId, txMeta);
+    }
+
+    @Override
+    public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex) {
+        return delegate.compareAndSet(txId, txStateExpected, txMeta, commandIndex);
+    }
+
+    @Override
+    public void remove(UUID txId) {
+        delegate.remove(txId);
+    }
+
+    @Override
+    public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
+        throw createDataWriteOnlyException();
+    }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return delegate.flush();
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return delegate.lastAppliedIndex();
+    }
+
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) {
+        delegate.lastAppliedIndex(lastAppliedIndex);
+    }
+
+    @Override
+    public long persistedIndex() {
+        return delegate.persistedIndex();
+    }
+
+    @Override
+    public void destroy() {
+        delegate.destroy();
+    }
+
+    @Override
+    public void close() throws Exception {
+        delegate.close();
+    }
+
+    private IllegalStateException createDataWriteOnlyException() {
+        return new IllegalStateException(ERROR_MESSAGE);

Review Comment:
   yes we can fix it later



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1013984016


##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateTableStorageTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract class for {@link TxStateTableStorage} testing.
+ */
+public abstract class AbstractTxStateTableStorageTest {
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_ID_1 = 1;
+
+    private TxStateTableStorage tableStorage;
+
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(TxStateTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Test
+    public void testStartRebalance() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalance(PARTITION_ID_1));
+
+        UUID txId = UUID.randomUUID();
+        TxMeta txMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        assertThat(storage, instanceOf(TxStateStorageDecorator.class));
+
+        storage.put(txId, txMeta);
+
+        storage.lastAppliedIndex(100);
+
+        storage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage0 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertNotNull(newStorage0);
+
+        assertSame(storage, newStorage0);
+
+        assertEquals(0L, newStorage0.lastAppliedIndex());
+        assertEquals(0L, newStorage0.persistedIndex());
+
+        assertThat(getAll(newStorage0.scan()), empty());
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage1 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertSame(newStorage0, newStorage1);
+    }
+
+    @Test
+    public void testAbortRebalance() throws Exception {
+        assertDoesNotThrow(() -> tableStorage.abortRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        UUID oldTxId = UUID.randomUUID();
+        TxMeta oldTxMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        storage.put(oldTxId, oldTxMeta);
+
+        storage.lastAppliedIndex(100);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        UUID newTxId = UUID.randomUUID();
+        TxMeta newTxMeta = mock(TxMeta.class);
+
+        storage.put(newTxId, newTxMeta);
+
+        storage.lastAppliedIndex(500);
+
+        tableStorage.abortRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        assertSame(storage, tableStorage.getTxStateStorage(PARTITION_ID));
+
+        assertDoesNotThrow(() -> tableStorage.abortRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        assertEquals(100L, storage.lastAppliedIndex());
+
+        assertThat(getAll(storage.scan()), containsInAnyOrder(new IgniteBiTuple<>(oldTxId, oldTxMeta)));
+    }
+
+    @Test
+    public void testFinishRebalance() throws Exception {
+        assertDoesNotThrow(() -> tableStorage.finishRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        UUID oldTxId = UUID.randomUUID();
+        TxMeta oldTxMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        storage.put(oldTxId, oldTxMeta);
+
+        storage.lastAppliedIndex(100);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        UUID newTxId = UUID.randomUUID();
+        TxMeta newTxMeta = mock(TxMeta.class);
+
+        storage.put(newTxId, newTxMeta);
+
+        storage.lastAppliedIndex(500);
+
+        tableStorage.finishRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        assertSame(storage, tableStorage.getTxStateStorage(PARTITION_ID));
+
+        assertDoesNotThrow(() -> tableStorage.finishRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        assertEquals(500L, storage.lastAppliedIndex());
+
+        assertThat(getAll(storage.scan()), containsInAnyOrder(new IgniteBiTuple<>(newTxId, newTxMeta)));
+    }
+
+    private static <T> List<T> getAll(Cursor<T> cursor) {

Review Comment:
   Fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1030488995


##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateTableStorageTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract class for {@link TxStateTableStorage} testing.
+ */
+public abstract class AbstractTxStateTableStorageTest {
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_ID_1 = 1;
+
+    private TxStateTableStorage tableStorage;
+
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(TxStateTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Test
+    public void testStartRebalance() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalance(PARTITION_ID_1));
+
+        UUID txId = UUID.randomUUID();
+        TxMeta txMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        assertThat(storage, instanceOf(TxStateStorageDecorator.class));
+
+        storage.put(txId, txMeta);
+
+        storage.lastAppliedIndex(100);
+
+        storage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage0 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertNotNull(newStorage0);
+
+        assertSame(storage, newStorage0);
+
+        assertEquals(0L, newStorage0.lastAppliedIndex());
+        assertEquals(0L, newStorage0.persistedIndex());
+
+        assertThat(getAll(newStorage0.scan()), empty());
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);

Review Comment:
   Changed it



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateTableStorageTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract class for {@link TxStateTableStorage} testing.
+ */
+public abstract class AbstractTxStateTableStorageTest {
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_ID_1 = 1;
+
+    private TxStateTableStorage tableStorage;
+
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(TxStateTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Test
+    public void testStartRebalance() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalance(PARTITION_ID_1));
+
+        UUID txId = UUID.randomUUID();
+        TxMeta txMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        assertThat(storage, instanceOf(TxStateStorageDecorator.class));
+
+        storage.put(txId, txMeta);
+
+        storage.lastAppliedIndex(100);
+
+        storage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage0 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertNotNull(newStorage0);
+
+        assertSame(storage, newStorage0);
+
+        assertEquals(0L, newStorage0.lastAppliedIndex());
+        assertEquals(0L, newStorage0.persistedIndex());
+
+        assertThat(getAll(newStorage0.scan()), empty());

Review Comment:
   Changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
ibessonov commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1013859290


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageOnRebalance.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Decorator for {@link TxStateStorage} on full rebalance.
+ *
+ * <p>Allows you to use methods to change data, throws {@link IllegalStateException} when reading data.
+ */
+public class TxStateStorageOnRebalance implements TxStateStorage {
+    private static final String ERROR_MESSAGE = "Transaction state storage is in full rebalancing, data reading is not available.";
+
+    private final TxStateStorage delegate;
+
+    /**
+     * Constructor.
+     *
+     * @param delegate Delegate.
+     */
+    public TxStateStorageOnRebalance(TxStateStorage delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public TxMeta get(UUID txId) {
+        throw createDataWriteOnlyException();

Review Comment:
   I'd rename this method to either `dataWriteOnlyException()` or `newDataWriteOnlyException()`, the word "create" feels like a part of the exception class name.



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateTableStorageTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract class for {@link TxStateTableStorage} testing.
+ */
+public abstract class AbstractTxStateTableStorageTest {
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_ID_1 = 1;
+
+    private TxStateTableStorage tableStorage;
+
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(TxStateTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Test
+    public void testStartRebalance() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalance(PARTITION_ID_1));
+
+        UUID txId = UUID.randomUUID();
+        TxMeta txMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        assertThat(storage, instanceOf(TxStateStorageDecorator.class));
+
+        storage.put(txId, txMeta);
+
+        storage.lastAppliedIndex(100);
+
+        storage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage0 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertNotNull(newStorage0);
+
+        assertSame(storage, newStorage0);
+
+        assertEquals(0L, newStorage0.lastAppliedIndex());
+        assertEquals(0L, newStorage0.persistedIndex());
+
+        assertThat(getAll(newStorage0.scan()), empty());
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);

Review Comment:
   Can we start rebalance 2 times? Looks kinda weird, I'd prohibit that



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorageOnRebalance.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Decorator for {@link TxStateStorage} on full rebalance.
+ *
+ * <p>Allows you to use methods to change data, throws {@link IllegalStateException} when reading data.
+ */
+public class TxStateStorageOnRebalance implements TxStateStorage {
+    private static final String ERROR_MESSAGE = "Transaction state storage is in full rebalancing, data reading is not available.";
+
+    private final TxStateStorage delegate;
+
+    /**
+     * Constructor.
+     *
+     * @param delegate Delegate.
+     */
+    public TxStateStorageOnRebalance(TxStateStorage delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public TxMeta get(UUID txId) {
+        throw createDataWriteOnlyException();
+    }
+
+    @Override
+    public void put(UUID txId, TxMeta txMeta) {
+        delegate.put(txId, txMeta);
+    }
+
+    @Override
+    public boolean compareAndSet(UUID txId, @Nullable TxState txStateExpected, TxMeta txMeta, long commandIndex) {
+        return delegate.compareAndSet(txId, txStateExpected, txMeta, commandIndex);
+    }
+
+    @Override
+    public void remove(UUID txId) {
+        delegate.remove(txId);
+    }
+
+    @Override
+    public Cursor<IgniteBiTuple<UUID, TxMeta>> scan() {
+        throw createDataWriteOnlyException();
+    }
+
+    @Override
+    public CompletableFuture<Void> flush() {
+        return delegate.flush();
+    }
+
+    @Override
+    public long lastAppliedIndex() {
+        return delegate.lastAppliedIndex();
+    }
+
+    @Override
+    public void lastAppliedIndex(long lastAppliedIndex) {
+        delegate.lastAppliedIndex(lastAppliedIndex);
+    }
+
+    @Override
+    public long persistedIndex() {
+        return delegate.persistedIndex();
+    }
+
+    @Override
+    public void destroy() {
+        delegate.destroy();
+    }
+
+    @Override
+    public void close() throws Exception {
+        delegate.close();
+    }
+
+    private IllegalStateException createDataWriteOnlyException() {
+        return new IllegalStateException(ERROR_MESSAGE);

Review Comment:
   I would prefer more specific type, but I guess we haven't started moving to a good exceptions structure in storages yet



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateTableStorageTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract class for {@link TxStateTableStorage} testing.
+ */
+public abstract class AbstractTxStateTableStorageTest {
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_ID_1 = 1;
+
+    private TxStateTableStorage tableStorage;
+
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(TxStateTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Test
+    public void testStartRebalance() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalance(PARTITION_ID_1));
+
+        UUID txId = UUID.randomUUID();
+        TxMeta txMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        assertThat(storage, instanceOf(TxStateStorageDecorator.class));
+
+        storage.put(txId, txMeta);
+
+        storage.lastAppliedIndex(100);
+
+        storage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage0 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertNotNull(newStorage0);
+
+        assertSame(storage, newStorage0);
+
+        assertEquals(0L, newStorage0.lastAppliedIndex());
+        assertEquals(0L, newStorage0.persistedIndex());
+
+        assertThat(getAll(newStorage0.scan()), empty());
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage1 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertSame(newStorage0, newStorage1);
+    }
+
+    @Test
+    public void testAbortRebalance() throws Exception {
+        assertDoesNotThrow(() -> tableStorage.abortRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        UUID oldTxId = UUID.randomUUID();
+        TxMeta oldTxMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        storage.put(oldTxId, oldTxMeta);
+
+        storage.lastAppliedIndex(100);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        UUID newTxId = UUID.randomUUID();
+        TxMeta newTxMeta = mock(TxMeta.class);
+
+        storage.put(newTxId, newTxMeta);
+
+        storage.lastAppliedIndex(500);
+
+        tableStorage.abortRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        assertSame(storage, tableStorage.getTxStateStorage(PARTITION_ID));
+
+        assertDoesNotThrow(() -> tableStorage.abortRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        assertEquals(100L, storage.lastAppliedIndex());
+
+        assertThat(getAll(storage.scan()), containsInAnyOrder(new IgniteBiTuple<>(oldTxId, oldTxMeta)));
+    }
+
+    @Test
+    public void testFinishRebalance() throws Exception {
+        assertDoesNotThrow(() -> tableStorage.finishRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        UUID oldTxId = UUID.randomUUID();
+        TxMeta oldTxMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        storage.put(oldTxId, oldTxMeta);
+
+        storage.lastAppliedIndex(100);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        UUID newTxId = UUID.randomUUID();
+        TxMeta newTxMeta = mock(TxMeta.class);
+
+        storage.put(newTxId, newTxMeta);
+
+        storage.lastAppliedIndex(500);
+
+        tableStorage.finishRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        assertSame(storage, tableStorage.getTxStateStorage(PARTITION_ID));
+
+        assertDoesNotThrow(() -> tableStorage.finishRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS));
+
+        assertEquals(500L, storage.lastAppliedIndex());
+
+        assertThat(getAll(storage.scan()), containsInAnyOrder(new IgniteBiTuple<>(newTxId, newTxMeta)));
+    }
+
+    private static <T> List<T> getAll(Cursor<T> cursor) {

Review Comment:
   Why not just throw an exception? See no point in wrapping it into a runtime exception



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateTableStorageTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.storage.state;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.configuration.storage.StorageException;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Abstract class for {@link TxStateTableStorage} testing.
+ */
+public abstract class AbstractTxStateTableStorageTest {
+    private static final int PARTITION_ID = 0;
+
+    private static final int PARTITION_ID_1 = 1;
+
+    private TxStateTableStorage tableStorage;
+
+    /**
+     * Initializes the internal structures needed for tests.
+     *
+     * <p>This method *MUST* always be called in either subclass' constructor or setUp method.
+     */
+    protected final void initialize(TxStateTableStorage tableStorage) {
+        this.tableStorage = tableStorage;
+    }
+
+    @Test
+    public void testStartRebalance() throws Exception {
+        assertThrows(StorageException.class, () -> tableStorage.startRebalance(PARTITION_ID_1));
+
+        UUID txId = UUID.randomUUID();
+        TxMeta txMeta = mock(TxMeta.class);
+
+        TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(PARTITION_ID);
+
+        assertThat(storage, instanceOf(TxStateStorageDecorator.class));
+
+        storage.put(txId, txMeta);
+
+        storage.lastAppliedIndex(100);
+
+        storage.flush().get(1, TimeUnit.SECONDS);
+
+        tableStorage.startRebalance(PARTITION_ID).get(1, TimeUnit.SECONDS);
+
+        TxStateStorage newStorage0 = tableStorage.getTxStateStorage(PARTITION_ID);
+
+        assertNotNull(newStorage0);
+
+        assertSame(storage, newStorage0);
+
+        assertEquals(0L, newStorage0.lastAppliedIndex());
+        assertEquals(0L, newStorage0.persistedIndex());
+
+        assertThat(getAll(newStorage0.scan()), empty());

Review Comment:
   What happens with old opened cursors? I have a bad feeling that we didn't think through some corner cases. It's easy to see that these scans will work in all storages except for persistent page memory (to some extent), but there's a moment before the complete invalidation when we will have to forcefully close cursors. To a lesser degree it's also applied to regular reads.
   Did you have any thoughts about the ways that storage "clients" will operate with them? Should we sit down and think some more? I'm not confident right now that we fully realize what we're doing and a potential impacts of it. Maybe backup should be fully readable and we only swap it at the end, maybe closing cursors or transparently reopening them (when it comes to RO transactions, it should be achievable).
   All of that is mainly about MV storage. Here it's not that problematic, RocksDB allows to have some pretty neat workarounds, right? We should still check everything though.
   Please think of everything I said and we'll discuss it later.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java:
##########
@@ -78,4 +75,49 @@ public interface TxStateTableStorage extends AutoCloseable {
      * @throws StorageException In case when the operation has failed.
      */
     void destroy() throws StorageException;
+
+    /**
+     * Prepares the transaction state storage for rebalancing: makes a backup of the current transaction state storage and creates a new
+     * storage.
+     *
+     * <p>This method must be called before every full rebalance of the transaction state storage, so that in case of errors or cancellation
+     * of the full rebalance, we can restore the transaction state storage from the backup.
+     *
+     * <p>Full rebalance will be completed when one of the methods is called:
+     * <ol>
+     *     <li>{@link #abortRebalance(int)} - in case of a full rebalance cancellation or failure, so that we can restore the transaction
+     *     state storage from a backup;</li>
+     *     <li>{@link #finishRebalance(int)} - in case of a successful full rebalance, to remove the backup of the transaction state
+     *     storage.</li>
+     * </ol>
+     *
+     * <p>Only modification of data in transaction state storage is allowed.
+     *
+     * @param partitionId Partition ID.
+     * @return Future, if completed without errors, then {@link #getTxStateStorage} will return a new (empty) transaction state storage.

Review Comment:
   I wouldn't say that it's "new", can we rephrase it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] tkalkirill commented on a diff in pull request #1313: IGNITE-18022 Creating an API for full rebalance without losing user data in TxStateStorage on receiver

Posted by GitBox <gi...@apache.org>.
tkalkirill commented on code in PR #1313:
URL: https://github.com/apache/ignite-3/pull/1313#discussion_r1013994555


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateTableStorage.java:
##########
@@ -78,4 +75,49 @@ public interface TxStateTableStorage extends AutoCloseable {
      * @throws StorageException In case when the operation has failed.
      */
     void destroy() throws StorageException;
+
+    /**
+     * Prepares the transaction state storage for rebalancing: makes a backup of the current transaction state storage and creates a new
+     * storage.
+     *
+     * <p>This method must be called before every full rebalance of the transaction state storage, so that in case of errors or cancellation
+     * of the full rebalance, we can restore the transaction state storage from the backup.
+     *
+     * <p>Full rebalance will be completed when one of the methods is called:
+     * <ol>
+     *     <li>{@link #abortRebalance(int)} - in case of a full rebalance cancellation or failure, so that we can restore the transaction
+     *     state storage from a backup;</li>
+     *     <li>{@link #finishRebalance(int)} - in case of a successful full rebalance, to remove the backup of the transaction state
+     *     storage.</li>
+     * </ol>
+     *
+     * <p>Only modification of data in transaction state storage is allowed.
+     *
+     * @param partitionId Partition ID.
+     * @return Future, if completed without errors, then {@link #getTxStateStorage} will return a new (empty) transaction state storage.

Review Comment:
   `Future, if completed without errors, then {@link #getTxStateStorage} will return write only (empty) transaction state storage.`
   
   Fine ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org