You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2021/09/02 16:44:18 UTC

[asterixdb] branch master updated: [NO ISSUE][REP] Add replica sync progress

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a1de795  [NO ISSUE][REP] Add replica sync progress
a1de795 is described below

commit a1de795e82273b128b706794988fb7dd09a0267d
Author: Murtadha Hubail <mu...@couchbase.com>
AuthorDate: Thu Sep 2 13:35:49 2021 +0300

    [NO ISSUE][REP] Add replica sync progress
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Add replica sync progress based on the replica missing
      files.
    - Add replica last progress timestamp that can be used
      to determine replica progress inactivity.
    
    Change-Id: Iab2cd7e745c4150e2d0aef3af864ec0f66dd96e7
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13063
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../common/replication/IPartitionReplica.java      | 14 +++++++++
 .../asterix/replication/api/PartitionReplica.java  | 36 ++++++++++++++++++++++
 .../replication/sync/ReplicaFilesSynchronizer.java |  7 ++++-
 3 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
index 761b2c6..f311655 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
@@ -51,4 +51,18 @@ public interface IPartitionReplica {
      * @param failure
      */
     void notifyFailure(Exception failure);
+
+    /**
+     * Gets the current sync progress
+     *
+     * @return the current sync progress
+     */
+    double getSyncProgress();
+
+    /**
+     * Gets the last progress time of this replica based on System.nanoTime
+     *
+     * @return the last progress time
+     */
+    long getLastProgressTime();
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 282d475..e265d03 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -52,6 +52,8 @@ public class PartitionReplica implements IPartitionReplica {
     private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
     private final INcApplicationContext appCtx;
     private final ReplicaIdentifier id;
+    private double syncProgress = -1;
+    private long lastProgressTime = -1;
     private ByteBuffer reusbaleBuf;
     private PartitionReplicaStatus status = DISCONNECTED;
     private ISocketChannel sc;
@@ -133,6 +135,16 @@ public class PartitionReplica implements IPartitionReplica {
         return reusbaleBuf;
     }
 
+    public synchronized void setSyncProgress(double syncProgress) {
+        this.syncProgress = syncProgress;
+        lastProgressTime = System.nanoTime();
+    }
+
+    @Override
+    public synchronized double getSyncProgress() {
+        return syncProgress;
+    }
+
     private JsonNode asJson() {
         ObjectNode json = OBJECT_MAPPER.createObjectNode();
         json.put("id", id.toString());
@@ -153,6 +165,19 @@ public class PartitionReplica implements IPartitionReplica {
     }
 
     @Override
+    public synchronized long getLastProgressTime() {
+        switch (status) {
+            case IN_SYNC:
+                return System.nanoTime();
+            case CATCHING_UP:
+                return lastProgressTime;
+            case DISCONNECTED:
+                return -1;
+        }
+        return -1;
+    }
+
+    @Override
     public int hashCode() {
         return id.hashCode();
     }
@@ -172,6 +197,17 @@ public class PartitionReplica implements IPartitionReplica {
         }
         LOGGER.info(() -> "Replica " + this + " status changing: " + this.status + " -> " + status);
         this.status = status;
+        switch (status) {
+            case IN_SYNC:
+                syncProgress = 1;
+                break;
+            case CATCHING_UP:
+                lastProgressTime = System.nanoTime();
+                break;
+            case DISCONNECTED:
+                syncProgress = -1;
+                break;
+        }
     }
 
     private void sendGoodBye() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 3c93d17..faf3f54 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -105,7 +105,12 @@ public class ReplicaFilesSynchronizer {
         final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
         // sort files to ensure index metadata files starting with "." are replicated first
         files.sort(String::compareTo);
-        files.forEach(sync::replicate);
+        int missingFilesCount = files.size();
+        for (int i = 0; i < missingFilesCount; i++) {
+            String file = files.get(i);
+            sync.replicate(file);
+            replica.setSyncProgress((i + 1d) / missingFilesCount);
+        }
     }
 
     private void deleteInvalidFiles(List<String> files) {