You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2023/08/15 20:24:12 UTC

Change in asterixdb[master]: [NO ISSUE][TX][TEST] Add test for atomic statments

From Peeyush Gupta <pe...@couchbase.com>:

Peeyush Gupta has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724 )


Change subject: [NO ISSUE][TX][TEST] Add test for atomic statments
......................................................................

[NO ISSUE][TX][TEST] Add test for atomic statments

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Add test that stops NCs at random times while upserts are
  taking place on standalone collections.
- Fix the issue with rollback and recovery for atomic statements

Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
6 files changed, 167 insertions(+), 3 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/24/17724/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
index 71a641e..57a74f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
@@ -32,6 +32,8 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * Message sent from CC to all NCs to rollback an atomic statement/job.
@@ -43,6 +45,8 @@
     private final List<Integer> datasetIds;
     private final Map<String, ILSMComponentId> componentIdMap;
 
+    private static final Logger LOGGER = LogManager.getLogger();
+
     public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds,
             Map<String, ILSMComponentId> componentIdMap) {
         this.jobId = jobId;
@@ -57,9 +61,12 @@
                 datasetLifecycleManager.getIndexCheckpointManagerProvider();
         componentIdMap.forEach((k, v) -> {
             try {
-                IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+                IIndexCheckpointManager checkpointManager =
+                        indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k));
                 if (checkpointManager.getCheckpointCount() > 0) {
                     IndexCheckpoint checkpoint = checkpointManager.getLatest();
+                    LOGGER.debug("Removing checkpoint for resource {} for component id {}", k,
+                            checkpoint.getLastComponentId());
                     if (checkpoint.getLastComponentId() == v.getMaxId()) {
                         checkpointManager.deleteLatest(v.getMaxId(), 1);
                     }
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
index 0936512..beaaac0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
@@ -70,6 +70,9 @@
                 deleteInvalidIndex(appContext, localResourceRepository, resource);
             }
         }
+        for (Integer partition : nodePartitions) {
+            localResourceRepository.cleanup(partition);
+        }
         try {
             broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null));
         } catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3f76a31..df2c25d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -112,12 +112,12 @@
         LOGGER.info("Starting Global Recovery");
         MetadataManager.INSTANCE.init();
         MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+        rollbackIncompleteAtomicTransactions(appCtx);
         if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
             int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
             performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
         }
         mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
-        rollbackIncompleteAtomicTransactions(appCtx);
         MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         recoveryCompleted = true;
         recovering = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 06380fe..a792b8d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -90,6 +90,15 @@
                 OUTPUT_FORMAT);
     }
 
+    public static void createDatasetWithoutType(String dataset, Map<String, String> fields, boolean pkAutogenerated)
+            throws Exception {
+        StringBuilder stringBuilder = new StringBuilder("");
+        fields.forEach((fName, fType) -> stringBuilder.append(fName).append(":").append(fType).append(","));
+        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+        TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + " PRIMARY KEY (" + stringBuilder + ")"
+                + (pkAutogenerated ? "AUTOGENERATED;" : ";"), OUTPUT_FORMAT);
+    }
+
     /**
      * Creates a secondary primary index
      * @param dataset the name of the dataset
@@ -127,6 +136,16 @@
         }
     }
 
+    public static void upsertBulkData(String dataset, long count) throws Exception {
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i < count; i++) {
+            stringBuilder.append("{\"name\": \"name_" + i + "\"},");
+        }
+        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+        TEST_EXECUTOR.executeSqlppUpdateOrDdl("INSERT INTO " + dataset + "([" + stringBuilder + "]);",
+                TestCaseContext.OutputFormat.CLEAN_JSON);
+    }
+
     /**
      * Gets the number of records in dataset {@code dataset}
      *
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
new file mode 100644
index 0000000..5934eae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicStatementsTest {
+    private static final String TEST_CONFIG_FILE_NAME = "cc.conf";
+    private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "src"
+            + File.separator + "main" + File.separator + "resources";
+    private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+    private static final String DATASET_NAME_PREFIX = "ds_";
+    private static final int NUM_DATASETS = 5;
+    private static final int BATCH_SIZE = 100;
+    private static final int NUM_UPSERTS = 100;
+    private static final int NUM_RECOVERIES = 10;
+
+    @Before
+    public void setUp() throws Exception {
+        integrationUtil.setGracefulShutdown(false);
+        integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
+        createDatasets();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(false);
+    }
+
+    private void createDatasets() throws Exception {
+        String datasetName;
+        for (int i = 0; i < NUM_DATASETS; i++) {
+            datasetName = DATASET_NAME_PREFIX + i;
+            TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true);
+            TestDataUtil.createSecondaryBTreeIndex(datasetName, datasetName + "_sidx", "name:string");
+        }
+    }
+
+    private Thread insertRecords(String dataset) {
+        Thread thread = new Thread(() -> {
+            try {
+                for (int i = 0; i < NUM_UPSERTS; i++) {
+                    TestDataUtil.upsertBulkData(dataset, BATCH_SIZE);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        thread.start();
+        return thread;
+    }
+
+    @Test
+    public void testAtomicityWithFailures() throws Exception {
+        for (int i = 0; i <= NUM_RECOVERIES; i++) {
+            List<Thread> threads = new ArrayList<>();
+            for (int j = 0; j < NUM_DATASETS; j++) {
+                threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+            }
+            Random rnd = new Random();
+            Thread.sleep(rnd.nextInt(2000) + 500);
+            integrationUtil.deinit(false);
+            integrationUtil.init(false, TEST_CONFIG_FILE_PATH);
+
+            for (int j = 0; j < NUM_DATASETS; j++) {
+                final long countAfterRecovery = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+                Assert.assertEquals(0, countAfterRecovery % BATCH_SIZE);
+            }
+        }
+    }
+
+    @Test
+    public void testAtomicityWithoutFailures() throws Exception {
+        List<Thread> threads = new ArrayList<>();
+        for (int j = 0; j < NUM_DATASETS; j++) {
+            threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+            threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+        }
+        for (Thread thread : threads) {
+            thread.join();
+        }
+        for (int j = 0; j < NUM_DATASETS; j++) {
+            long count = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+            Assert.assertEquals(2 * NUM_UPSERTS * BATCH_SIZE, count);
+        }
+
+    }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 7e38b8b..34bc587 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -279,7 +279,7 @@
         for (FlushOperation flush : lastFlushOperation.values()) {
             FileReference target = flush.getTarget();
             Map<String, Object> map = flush.getParameters();
-            final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+            final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID);
             final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
             final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
             indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b
Gerrit-Change-Number: 17724
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <pe...@couchbase.com>
Gerrit-MessageType: newchange