You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2023/08/15 20:24:12 UTC
Change in asterixdb[master]: [NO ISSUE][TX][TEST] Add test for atomic statments
From Peeyush Gupta <pe...@couchbase.com>:
Peeyush Gupta has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724 )
Change subject: [NO ISSUE][TX][TEST] Add test for atomic statments
......................................................................
[NO ISSUE][TX][TEST] Add test for atomic statments
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add test that stops NCs at random times while upserts are
taking place on standalone collections.
- Fix the issue with rollback and recovery for atomic statements
Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
6 files changed, 167 insertions(+), 3 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/24/17724/1
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
index 71a641e..57a74f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/AtomicJobRollbackMessage.java
@@ -32,6 +32,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
/**
* Message sent from CC to all NCs to rollback an atomic statement/job.
@@ -43,6 +45,8 @@
private final List<Integer> datasetIds;
private final Map<String, ILSMComponentId> componentIdMap;
+ private static final Logger LOGGER = LogManager.getLogger();
+
public AtomicJobRollbackMessage(JobId jobId, List<Integer> datasetIds,
Map<String, ILSMComponentId> componentIdMap) {
this.jobId = jobId;
@@ -57,9 +61,12 @@
datasetLifecycleManager.getIndexCheckpointManagerProvider();
componentIdMap.forEach((k, v) -> {
try {
- IIndexCheckpointManager checkpointManager = indexCheckpointManagerProvider.get(ResourceReference.of(k));
+ IIndexCheckpointManager checkpointManager =
+ indexCheckpointManagerProvider.get(ResourceReference.ofIndex(k));
if (checkpointManager.getCheckpointCount() > 0) {
IndexCheckpoint checkpoint = checkpointManager.getLatest();
+ LOGGER.debug("Removing checkpoint for resource {} for component id {}", k,
+ checkpoint.getLastComponentId());
if (checkpoint.getLastComponentId() == v.getMaxId()) {
checkpointManager.deleteLatest(v.getMaxId(), 1);
}
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
index 0936512..beaaac0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
@@ -70,6 +70,9 @@
deleteInvalidIndex(appContext, localResourceRepository, resource);
}
}
+ for (Integer partition : nodePartitions) {
+ localResourceRepository.cleanup(partition);
+ }
try {
broker.sendMessageToPrimaryCC(new VoidResponse(reqId, null));
} catch (Exception e) {
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3f76a31..df2c25d 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -112,12 +112,12 @@
LOGGER.info("Starting Global Recovery");
MetadataManager.INSTANCE.init();
MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ rollbackIncompleteAtomicTransactions(appCtx);
if (appCtx.getStorageProperties().isStorageGlobalCleanup()) {
int storageGlobalCleanupTimeout = appCtx.getStorageProperties().getStorageGlobalCleanupTimeout();
performGlobalStorageCleanup(mdTxnCtx, storageGlobalCleanupTimeout);
}
mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
- rollbackIncompleteAtomicTransactions(appCtx);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
recoveryCompleted = true;
recovering = false;
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 06380fe..a792b8d 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -90,6 +90,15 @@
OUTPUT_FORMAT);
}
+ public static void createDatasetWithoutType(String dataset, Map<String, String> fields, boolean pkAutogenerated)
+ throws Exception {
+ StringBuilder stringBuilder = new StringBuilder("");
+ fields.forEach((fName, fType) -> stringBuilder.append(fName).append(":").append(fType).append(","));
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ TEST_EXECUTOR.executeSqlppUpdateOrDdl("CREATE DATASET " + dataset + " PRIMARY KEY (" + stringBuilder + ")"
+ + (pkAutogenerated ? "AUTOGENERATED;" : ";"), OUTPUT_FORMAT);
+ }
+
/**
* Creates a secondary primary index
* @param dataset the name of the dataset
@@ -127,6 +136,16 @@
}
}
+ public static void upsertBulkData(String dataset, long count) throws Exception {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < count; i++) {
+ stringBuilder.append("{\"name\": \"name_" + i + "\"},");
+ }
+ stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+ TEST_EXECUTOR.executeSqlppUpdateOrDdl("INSERT INTO " + dataset + "([" + stringBuilder + "]);",
+ TestCaseContext.OutputFormat.CLEAN_JSON);
+ }
+
/**
* Gets the number of records in dataset {@code dataset}
*
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
new file mode 100644
index 0000000..5934eae
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/atomic_statements/AtomicStatementsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.atomic_statements;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AtomicStatementsTest {
+ private static final String TEST_CONFIG_FILE_NAME = "cc.conf";
+ private static final String TEST_CONFIG_PATH = System.getProperty("user.dir") + File.separator + "src"
+ + File.separator + "main" + File.separator + "resources";
+ private static final String TEST_CONFIG_FILE_PATH = TEST_CONFIG_PATH + File.separator + TEST_CONFIG_FILE_NAME;
+ private static final AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil();
+
+ private static final String DATASET_NAME_PREFIX = "ds_";
+ private static final int NUM_DATASETS = 5;
+ private static final int BATCH_SIZE = 100;
+ private static final int NUM_UPSERTS = 100;
+ private static final int NUM_RECOVERIES = 10;
+
+ @Before
+ public void setUp() throws Exception {
+ integrationUtil.setGracefulShutdown(false);
+ integrationUtil.init(true, TEST_CONFIG_FILE_PATH);
+ createDatasets();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ integrationUtil.deinit(false);
+ }
+
+ private void createDatasets() throws Exception {
+ String datasetName;
+ for (int i = 0; i < NUM_DATASETS; i++) {
+ datasetName = DATASET_NAME_PREFIX + i;
+ TestDataUtil.createDatasetWithoutType(datasetName, Map.of("id", "uuid"), true);
+ TestDataUtil.createSecondaryBTreeIndex(datasetName, datasetName + "_sidx", "name:string");
+ }
+ }
+
+ private Thread insertRecords(String dataset) {
+ Thread thread = new Thread(() -> {
+ try {
+ for (int i = 0; i < NUM_UPSERTS; i++) {
+ TestDataUtil.upsertBulkData(dataset, BATCH_SIZE);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ thread.start();
+ return thread;
+ }
+
+ @Test
+ public void testAtomicityWithFailures() throws Exception {
+ for (int i = 0; i <= NUM_RECOVERIES; i++) {
+ List<Thread> threads = new ArrayList<>();
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+ }
+ Random rnd = new Random();
+ Thread.sleep(rnd.nextInt(2000) + 500);
+ integrationUtil.deinit(false);
+ integrationUtil.init(false, TEST_CONFIG_FILE_PATH);
+
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ final long countAfterRecovery = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+ Assert.assertEquals(0, countAfterRecovery % BATCH_SIZE);
+ }
+ }
+ }
+
+ @Test
+ public void testAtomicityWithoutFailures() throws Exception {
+ List<Thread> threads = new ArrayList<>();
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+ threads.add(insertRecords(DATASET_NAME_PREFIX + j));
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ for (int j = 0; j < NUM_DATASETS; j++) {
+ long count = TestDataUtil.getDatasetCount(DATASET_NAME_PREFIX + j);
+ Assert.assertEquals(2 * NUM_UPSERTS * BATCH_SIZE, count);
+ }
+
+ }
+}
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 7e38b8b..34bc587 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -279,7 +279,7 @@
for (FlushOperation flush : lastFlushOperation.values()) {
FileReference target = flush.getTarget();
Map<String, Object> map = flush.getParameters();
- final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+ final LSMComponentId id = (LSMComponentId) map.get(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID);
final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
indexCheckpointManagerProvider.get(ref).flushed(componentSequence, 0L, id.getMaxId());
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17724
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Ifcaa65690ca99681cc5bebd8f220e5389298d61b
Gerrit-Change-Number: 17724
Gerrit-PatchSet: 1
Gerrit-Owner: Peeyush Gupta <pe...@couchbase.com>
Gerrit-MessageType: newchange