You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2021/09/14 16:53:11 UTC

[asterixdb] branch master updated: [NO ISSUE][REP] Add API to perform non-delta recovery for a replica

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f278c0  [NO ISSUE][REP] Add API to perform non-delta recovery for a replica
8f278c0 is described below

commit 8f278c042d92135f737cd7b26bd56a3479e11106
Author: Murtadha Hubail <mu...@couchbase.com>
AuthorDate: Tue Sep 14 02:12:15 2021 +0300

    [NO ISSUE][REP] Add API to perform non-delta recovery for a replica
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Add an option to perform non-delta recovery for a replica.
    
    Change-Id: Ib1837e8f1aefdd9e085ccfd62f1c6e6d4eb969e8
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13223
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../asterix/replication/api/PartitionReplica.java  |  6 +-
 .../replication/messaging/DeletePartitionTask.java | 75 ++++++++++++++++++++++
 .../messaging/PartitionResourcesListResponse.java  |  2 +-
 .../replication/messaging/ReplicationProtocol.java |  5 +-
 .../replication/sync/ReplicaFilesSynchronizer.java | 14 +++-
 .../replication/sync/ReplicaSynchronizer.java      |  8 +--
 .../PersistentLocalResourceRepository.java         | 12 ++++
 7 files changed, 112 insertions(+), 10 deletions(-)

diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index e265d03..3b10700 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -79,17 +79,17 @@ public class PartitionReplica implements IPartitionReplica {
     }
 
     public synchronized void sync() {
-        sync(true);
+        sync(true, true);
     }
 
-    public synchronized void sync(boolean register) {
+    public synchronized void sync(boolean register, boolean deltaRecovery) {
         if (status == IN_SYNC || status == CATCHING_UP) {
             return;
         }
         setStatus(CATCHING_UP);
         appCtx.getThreadExecutor().execute(() -> {
             try {
-                new ReplicaSynchronizer(appCtx, this).sync(register);
+                new ReplicaSynchronizer(appCtx, this).sync(register, deltaRecovery);
                 setStatus(IN_SYNC);
             } catch (Exception e) {
                 LOGGER.error(() -> "Failed to sync replica " + this, e);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java
new file mode 100644
index 0000000..90139df
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeletePartitionTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.messaging;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class DeletePartitionTask implements IReplicaTask {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final int partitionId;
+
+    public DeletePartitionTask(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    @Override
+    public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
+        try {
+            PersistentLocalResourceRepository localResourceRepository =
+                    (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+            LOGGER.warn("deleting storage partition {}", partitionId);
+            localResourceRepository.deletePartition(partitionId);
+            ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
+        } catch (Exception e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    @Override
+    public ReplicationProtocol.ReplicationRequestType getMessageType() {
+        return ReplicationProtocol.ReplicationRequestType.DELETE_PARTITION;
+    }
+
+    @Override
+    public void serialize(OutputStream out) throws HyracksDataException {
+        try {
+            DataOutputStream dos = new DataOutputStream(out);
+            dos.writeInt(partitionId);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static DeletePartitionTask create(DataInput input) throws IOException {
+        return new DeletePartitionTask(input.readInt());
+    }
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
index a9921c6..1a5ba88 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -33,7 +33,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public class PartitionResourcesListResponse implements IReplicationMessage {
 
     private final int partition;
-    private Map<String, Long> partitionReplicatedResources;
+    private final Map<String, Long> partitionReplicatedResources;
     private final List<String> files;
     private final boolean owner;
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index ed2c93f..5b0c64e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -61,7 +61,8 @@ public class ReplicationProtocol {
         LSM_COMPONENT_MASK,
         MARK_COMPONENT_VALID,
         DROP_INDEX,
-        REPLICATE_LOGS
+        REPLICATE_LOGS,
+        DELETE_PARTITION
     }
 
     private static final Map<Integer, ReplicationRequestType> TYPES = new HashMap<>();
@@ -177,6 +178,8 @@ public class ReplicationProtocol {
                         return MarkComponentValidTask.create(dis);
                     case REPLICATE_LOGS:
                         return ReplicateLogsTask.create(dis);
+                    case DELETE_PARTITION:
+                        return DeletePartitionTask.create(dis);
                     default:
                         throw new IllegalStateException("Unrecognized replication message");
                 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index faf3f54..735318d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -35,6 +35,7 @@ import org.apache.asterix.common.storage.IndexCheckpoint;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.messaging.DeletePartitionTask;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
 import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
@@ -55,14 +56,19 @@ public class ReplicaFilesSynchronizer {
     private static final Logger LOGGER = LogManager.getLogger();
     private final PartitionReplica replica;
     private final INcApplicationContext appCtx;
+    private final boolean deltaRecovery;
 
-    public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica) {
+    public ReplicaFilesSynchronizer(INcApplicationContext appCtx, PartitionReplica replica, boolean deltaRecovery) {
         this.appCtx = appCtx;
         this.replica = replica;
+        this.deltaRecovery = deltaRecovery;
     }
 
     public void sync() throws IOException {
         final int partition = replica.getIdentifier().getPartition();
+        if (!deltaRecovery) {
+            deletePartitionFromReplica(partition);
+        }
         PartitionResourcesListResponse replicaResourceResponse = getReplicaFiles(partition);
         Map<ResourceReference, Long> resourceReferenceLongMap = getValidReplicaResources(
                 replicaResourceResponse.getPartitionReplicatedResources(), replicaResourceResponse.isOwner());
@@ -82,6 +88,12 @@ public class ReplicaFilesSynchronizer {
         deleteReplicaExtraFiles(replicaFiles, masterFiles);
     }
 
+    private void deletePartitionFromReplica(int partitionId) throws IOException {
+        DeletePartitionTask deletePartitionTask = new DeletePartitionTask(partitionId);
+        ReplicationProtocol.sendTo(replica, deletePartitionTask);
+        ReplicationProtocol.waitForAck(replica);
+    }
+
     private void deleteReplicaExtraFiles(Set<String> replicaFiles, Set<String> masterFiles) {
         final List<String> replicaInvalidFiles =
                 replicaFiles.stream().filter(file -> !masterFiles.contains(file)).collect(Collectors.toList());
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 6030245..05e2e75 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -43,13 +43,13 @@ public class ReplicaSynchronizer {
         this.replica = replica;
     }
 
-    public void sync(boolean register) throws IOException {
+    public void sync(boolean register, boolean deltaRecovery) throws IOException {
         synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
             final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager();
             try {
                 // suspend checkpointing datasets to prevent async IO operations while sync'ing replicas
                 checkpointManager.suspend();
-                syncFiles();
+                syncFiles(deltaRecovery);
                 checkpointReplicaIndexes();
                 if (register) {
                     appCtx.getReplicationManager().register(replica);
@@ -60,8 +60,8 @@ public class ReplicaSynchronizer {
         }
     }
 
-    private void syncFiles() throws IOException {
-        final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica);
+    private void syncFiles(boolean deltaRecovery) throws IOException {
+        final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery);
         // flush replicated dataset to generate disk component for any remaining in-memory components
         final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy();
         appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index ee5b16e..02f5772 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -695,4 +695,16 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         FileReference resolve = ioManager.resolve(path.toString());
         return resolve.getFile().toPath();
     }
+
+    public void deletePartition(int partitionId) {
+        List<File> onDiskPartitions = getOnDiskPartitions();
+        for (File onDiskPartition : onDiskPartitions) {
+            int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
+            if (partitionNum == partitionId) {
+                LOGGER.warn("deleting partition {}", partitionNum);
+                FileUtils.deleteQuietly(onDiskPartition);
+                return;
+            }
+        }
+    }
 }