You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu> on 2022/04/18 17:38:31 UTC

Change in asterixdb[master]: [NO ISSUE][OTH] Txn Logs Replication Trace Logs

From Dmitry Lychagin <dm...@couchbase.com>:

Hello Murtadha Hubail, Ali Alsuliman, Jenkins,

I'd like you to do a code review. Please visit

    https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16126

to review the following change.


Change subject: [NO ISSUE][OTH] Txn Logs Replication Trace Logs
......................................................................

[NO ISSUE][OTH] Txn Logs Replication Trace Logs

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add debug logs for txn logs replication.

Change-Id: Id4a98e30763f9a86952e1dc1c226af89dddc2b0a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15865
Reviewed-by: Ali Alsuliman <al...@gmail.com>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
8 files changed, 32 insertions(+), 1 deletion(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/26/16126/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 96e89a7..42159be 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -102,6 +102,7 @@
     public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
         final IndexCheckpoint latest = getLatest();
         latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
+        LOGGER.debug("index {} master flush {} -> {}", indexPath, masterLsn, localLsn);
         final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
                 latest.getValidComponentSequence(), latest.getLastComponentId(), null);
         persist(next);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
index c7b2561..d6cccc0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
@@ -35,4 +35,9 @@
      * @return the reusable buffer
      */
     ByteBuffer getReusableBuffer();
+
+    /**
+     * @return The remote address of the sender
+     */
+    String getRemoteAddress();
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
index b094d9e..63e194e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
@@ -47,7 +47,8 @@
     public void process(ByteBuffer logsBatch, RemoteLogRecord reusableLog, IReplicationWorker worker) {
         while (logsBatch.hasRemaining()) {
             // get rid of log size
-            logsBatch.getInt();
+            int batchSize = logsBatch.getInt();
+            LOGGER.debug("received logs batch size {} from {}", batchSize, worker.getRemoteAddress());
             reusableLog.readRemoteLog(logsBatch);
             reusableLog.setLogSource(LogSource.REMOTE);
             switch (reusableLog.getLogType()) {
@@ -74,6 +75,8 @@
                     flushLog.setRequester(this);
                     flushLog.setLogSource(LogSource.REMOTE);
                     flushLog.setMasterLsn(reusableLog.getLSN());
+                    LOGGER.debug("received master LSN {} for partition {}", reusableLog.getLSN(),
+                            reusableLog.getResourcePartition());
                     logManager.log(flushLog);
                     break;
                 default:
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 2ed2ac9..3c13825 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -23,9 +23,12 @@
 
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.replication.management.LogReplicationManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class ReplicationLogBuffer {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private final int logBufferSize;
     private final AtomicBoolean full;
     private int appendOffset;
@@ -117,6 +120,7 @@
     private void transferBuffer(ByteBuffer buffer) {
         if (buffer.remaining() <= batchSize) {
             //the current batch can be sent as it is
+            LOGGER.debug("sending txn logs batch size {}", buffer.remaining());
             replicationManager.transferBatch(buffer);
             return;
         }
@@ -142,6 +146,7 @@
                 //return to the beginning of the batch position
                 buffer.reset();
             }
+            LOGGER.debug("sending logs slice size {}", buffer.remaining());
             replicationManager.transferBatch(buffer);
             //return the original limit to check the new remaining size
             buffer.limit(totalTransferLimit);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index f1d8d4d..b76fa25 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -131,6 +131,9 @@
                 ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
             }
         }
+        if (logRecord.getLogType() == LogType.FLUSH) {
+            LOGGER.debug("appending flush lsn {} to replication queue", logRecord.getLSN());
+        }
         appendToLogBuffer(logRecord);
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 7f6439c..736b54e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -114,6 +114,7 @@
         while (requestBuffer.hasRemaining()) {
             socketChannel.write(requestBuffer);
         }
+        socketChannel.getSocketChannel().socket().getOutputStream().flush();
     }
 
     //unused
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 3dc094e..19de02a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -151,6 +151,15 @@
             return outBuffer;
         }
 
+        @Override
+        public String getRemoteAddress() {
+            try {
+                return socketChannel.getSocketChannel().getRemoteAddress().toString();
+            } catch (Exception e) {
+                return "unknown";
+            }
+        }
+
         private void handle(ReplicationRequestType requestType) throws HyracksDataException {
             final IReplicaTask task =
                     (IReplicaTask) ReplicationProtocol.readMessage(requestType, socketChannel, inBuffer);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
index d9357df..67c8eba 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
@@ -34,12 +34,15 @@
 import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * A task to replicate transaction logs from master replica
  */
 public class ReplicateLogsTask implements IReplicaTask {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     public static final int END_REPLICATION_LOG_SIZE = 1;
     private final String nodeId;
 
@@ -61,6 +64,7 @@
                 logsBuffer = ReplicationProtocol.readRequest(channel, logsBuffer);
                 // check if it is end of handshake
                 if (logsBuffer.remaining() == END_REPLICATION_LOG_SIZE) {
+                    LOGGER.info("ending log replication with {}", worker.getRemoteAddress());
                     break;
                 }
                 logsProcessor.process(logsBuffer, reusableLog, worker);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16126
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Id4a98e30763f9a86952e1dc1c226af89dddc2b0a
Gerrit-Change-Number: 16126
Gerrit-PatchSet: 1
Gerrit-Owner: Dmitry Lychagin <dm...@couchbase.com>
Gerrit-Reviewer: Ali Alsuliman <al...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange

Change in asterixdb[master]: [NO ISSUE][OTH] Txn Logs Replication Trace Logs

Posted by AsterixDB Code Review <do...@asterix-gerrit.ics.uci.edu>.
From Dmitry Lychagin <dm...@couchbase.com>:

Hello Murtadha Hubail, Ali Alsuliman, Jenkins,

I'd like you to do a code review. Please visit

    https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16126

to review the following change.


Change subject: [NO ISSUE][OTH] Txn Logs Replication Trace Logs
......................................................................

[NO ISSUE][OTH] Txn Logs Replication Trace Logs

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add debug logs for txn logs replication.

Change-Id: Id4a98e30763f9a86952e1dc1c226af89dddc2b0a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15865
Reviewed-by: Ali Alsuliman <al...@gmail.com>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
8 files changed, 32 insertions(+), 1 deletion(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/26/16126/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 96e89a7..42159be 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -102,6 +102,7 @@
     public synchronized void masterFlush(long masterLsn, long localLsn) throws HyracksDataException {
         final IndexCheckpoint latest = getLatest();
         latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
+        LOGGER.debug("index {} master flush {} -> {}", indexPath, masterLsn, localLsn);
         final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(),
                 latest.getValidComponentSequence(), latest.getLastComponentId(), null);
         persist(next);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
index c7b2561..d6cccc0 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/IReplicationWorker.java
@@ -35,4 +35,9 @@
      * @return the reusable buffer
      */
     ByteBuffer getReusableBuffer();
+
+    /**
+     * @return The remote address of the sender
+     */
+    String getRemoteAddress();
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
index b094d9e..63e194e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java
@@ -47,7 +47,8 @@
     public void process(ByteBuffer logsBatch, RemoteLogRecord reusableLog, IReplicationWorker worker) {
         while (logsBatch.hasRemaining()) {
             // get rid of log size
-            logsBatch.getInt();
+            int batchSize = logsBatch.getInt();
+            LOGGER.debug("received logs batch size {} from {}", batchSize, worker.getRemoteAddress());
             reusableLog.readRemoteLog(logsBatch);
             reusableLog.setLogSource(LogSource.REMOTE);
             switch (reusableLog.getLogType()) {
@@ -74,6 +75,8 @@
                     flushLog.setRequester(this);
                     flushLog.setLogSource(LogSource.REMOTE);
                     flushLog.setMasterLsn(reusableLog.getLSN());
+                    LOGGER.debug("received master LSN {} for partition {}", reusableLog.getLSN(),
+                            reusableLog.getResourcePartition());
                     logManager.log(flushLog);
                     break;
                 default:
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 2ed2ac9..3c13825 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -23,9 +23,12 @@
 
 import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.replication.management.LogReplicationManager;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class ReplicationLogBuffer {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private final int logBufferSize;
     private final AtomicBoolean full;
     private int appendOffset;
@@ -117,6 +120,7 @@
     private void transferBuffer(ByteBuffer buffer) {
         if (buffer.remaining() <= batchSize) {
             //the current batch can be sent as it is
+            LOGGER.debug("sending txn logs batch size {}", buffer.remaining());
             replicationManager.transferBatch(buffer);
             return;
         }
@@ -142,6 +146,7 @@
                 //return to the beginning of the batch position
                 buffer.reset();
             }
+            LOGGER.debug("sending logs slice size {}", buffer.remaining());
             replicationManager.transferBatch(buffer);
             //return the original limit to check the new remaining size
             buffer.limit(totalTransferLimit);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
index f1d8d4d..b76fa25 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -131,6 +131,9 @@
                 ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
             }
         }
+        if (logRecord.getLogType() == LogType.FLUSH) {
+            LOGGER.debug("appending flush lsn {} to replication queue", logRecord.getLSN());
+        }
         appendToLogBuffer(logRecord);
     }
 
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 7f6439c..736b54e 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -114,6 +114,7 @@
         while (requestBuffer.hasRemaining()) {
             socketChannel.write(requestBuffer);
         }
+        socketChannel.getSocketChannel().socket().getOutputStream().flush();
     }
 
     //unused
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 3dc094e..19de02a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -151,6 +151,15 @@
             return outBuffer;
         }
 
+        @Override
+        public String getRemoteAddress() {
+            try {
+                return socketChannel.getSocketChannel().getRemoteAddress().toString();
+            } catch (Exception e) {
+                return "unknown";
+            }
+        }
+
         private void handle(ReplicationRequestType requestType) throws HyracksDataException {
             final IReplicaTask task =
                     (IReplicaTask) ReplicationProtocol.readMessage(requestType, socketChannel, inBuffer);
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
index d9357df..67c8eba 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateLogsTask.java
@@ -34,12 +34,15 @@
 import org.apache.asterix.replication.management.ReplicationChannel;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * A task to replicate transaction logs from master replica
  */
 public class ReplicateLogsTask implements IReplicaTask {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     public static final int END_REPLICATION_LOG_SIZE = 1;
     private final String nodeId;
 
@@ -61,6 +64,7 @@
                 logsBuffer = ReplicationProtocol.readRequest(channel, logsBuffer);
                 // check if it is end of handshake
                 if (logsBuffer.remaining() == END_REPLICATION_LOG_SIZE) {
+                    LOGGER.info("ending log replication with {}", worker.getRemoteAddress());
                     break;
                 }
                 logsProcessor.process(logsBuffer, reusableLog, worker);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16126
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Id4a98e30763f9a86952e1dc1c226af89dddc2b0a
Gerrit-Change-Number: 16126
Gerrit-PatchSet: 1
Gerrit-Owner: Dmitry Lychagin <dm...@couchbase.com>
Gerrit-Reviewer: Ali Alsuliman <al...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mh...@apache.org>
Gerrit-MessageType: newchange