You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org> on 2016/05/24 18:44:54 UTC

Change in asterixdb[master]: Txn Log Replication Optimizations

Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/883

Change subject: Txn Log Replication Optimizations
......................................................................

Txn Log Replication Optimizations

- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.

Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
R asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
16 files changed, 437 insertions(+), 338 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/83/883/1

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 0a6d62d..c71d77e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -150,7 +150,7 @@
         }
     }
 
-    private void startReplicationService() {
+    private void startReplicationService() throws InterruptedException {
         //Open replication channel
         runtimeContext.getReplicationChannel().start();
 
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index b1d0649..1d693f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,10 +98,4 @@
     <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
     </description>
   </property>
-  <property>
-    <name>log.level</name>
-    <value>WARNING</value>
-    <description>The minimum log level to be displayed. (Default = INFO)
-    </description>
-  </property>
 </asterixConfiguration>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 5d31d9a..90ec60b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -26,6 +26,8 @@
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 public class AsterixReplicationProperties extends AbstractAsterixProperties {
 
@@ -38,6 +40,16 @@
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
+
+    private static final String REPLICATION_LOG_BATCH_SIZE_KEY = "replication.log.batchsize";
+    private static final int REPLICATION_LOG_BATCH_SIZE_DEFAULT = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+
+    private static final String REPLICATION_LOG_BUFFER_NUM_PAGES_KEY = "replication.log.buffer.numpages";
+    private static int REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT = 8;
+
+    private static final String REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY = "replication.log.buffer.pagesize";
+    private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128,
+            StorageUnit.KILOBYTE);
 
     public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) {
         super(accessor);
@@ -245,4 +257,19 @@
     public int getMaxRemoteRecoveryAttempts() {
         return MAX_REMOTE_RECOVERY_ATTEMPTS;
     }
+
+    public int getLogBufferPageSize() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY, REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBufferNumOfPages() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_NUM_PAGES_KEY, REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBatchSize() {
+        return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index a2738e8..9f9d74b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -43,6 +43,7 @@
      * Requests the remaining LSM disk components files from active remote replicas.
      *
      * @throws IOException
+     * @throws InterruptedException
      */
-    public void completeFailbackProcess() throws IOException;
+    public void completeFailbackProcess() throws IOException, InterruptedException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 755fbbd..6bd1505 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.replication;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Set;
 
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -31,8 +32,9 @@
      *
      * @param logRecord
      *            The log record to be replicated,
+     * @throws InterruptedException
      */
-    public void replicateLog(ILogRecord logRecord);
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException;
 
     /**
      * Checks whether a log record has been replicated
@@ -79,8 +81,10 @@
 
     /**
      * Starts processing of ASYNC replication jobs as well as Txn logs.
+     *
+     * @throws InterruptedException
      */
-    public void startReplicationThreads();
+    public void startReplicationThreads() throws InterruptedException;
 
     /**
      * Checks and sets each remote replica state.
@@ -114,4 +118,13 @@
      * @throws IOException
      */
     public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
+
+    /**
+     * Transfers the contents of the {@code buffer} to active remote replicas.
+     * The transfer starts from the {@code buffer} current position to its limit.
+     * After the transfer, the {@code buffer} position will be its limit.
+     *
+     * @param buffer
+     */
+    public void replicateTxnLogBatch(ByteBuffer buffer);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index 4c3f728..b8fe4b2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -62,8 +62,7 @@
     public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) {
         String replicaIPAddress = node.getClusterIp();
         int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId());
-        InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
-        return replicaAddress;
+        return InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
     }
 
     public static Replica create(DataInput input) throws IOException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 3738cd1..1992a00 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -110,9 +110,7 @@
 
     public String getNodeId();
 
-    public int writeRemoteRecoveryLog(ByteBuffer buffer);
-
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog);
+    public void readRemoteLog(ByteBuffer buffer);
 
     public void setReplicationThread(IReplicationThread replicationThread);
 
@@ -120,11 +118,7 @@
 
     public byte getLogSource();
 
-    public int getSerializedLogSize();
-
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN);
-
-    public ByteBuffer getSerializedLog();
+    public int getRemoteLogSize();
 
     public void setNodeId(String nodeId);
 
@@ -138,4 +132,6 @@
      * @return a flag indicating whether the log record should be sent to remote replicas
      */
     public boolean isReplicated();
+
+    public void writeRemoteLogRecord(ByteBuffer buffer);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index fd56913..88698e3 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -44,8 +44,7 @@
  * PKValueSize(4)
  * PKValue(PKValueSize)
  * ---------------------------
- * [Header3] (20 bytes) : only for update log type
- * PrevLSN(8)
+ * [Header3] (12 bytes) : only for update log type
  * ResourceId(8) //stored in .metadata of the corresponding index in NC node
  * LogRecordSize(4)
  * ---------------------------
@@ -71,6 +70,24 @@
  */
 
 public class LogRecord implements ILogRecord {
+
+    private static final int LOG_SOURCE_LEN = Byte.BYTES;
+    private static final int TYPE_LEN = Byte.BYTES;
+    public static final int PKHASH_LEN = Integer.BYTES;
+    public static final int PKSZ_LEN = Integer.BYTES;
+    private static final int RS_PARTITION_LEN = Integer.BYTES;
+    private static final int RSID_LEN = Long.BYTES;
+    private static final int LOGRCD_SZ_LEN = Integer.BYTES;
+    private static final int FLDCNT_LEN = Integer.BYTES;
+    private static final int NEWOP_LEN = Byte.BYTES;
+    private static final int NEWVALSZ_LEN = Integer.BYTES;
+    private static final int CHKSUM_LEN = Long.BYTES;
+
+    private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+    private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
+            + PKSZ_LEN;
+    private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
 
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
@@ -101,9 +118,8 @@
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
     private IReplicationThread replicationThread;
-    private ByteBuffer serializedLog;
     /**
-     * The fields (numOfFlushedIndexes and nodeId) are used for serialized flush logs only
+     * The fields (numOfFlushedIndexes and nodeId) are used for remote flush logs only
      * to indicate the source of the log and how many indexes were flushed using its LSN.
      */
     private int numOfFlushedIndexes;
@@ -118,25 +134,6 @@
         checksumGen = new CRC32();
         logSource = LogSource.LOCAL;
     }
-
-    private final static int LOG_SOURCE_LEN = Byte.BYTES;
-    private final static int TYPE_LEN = Byte.BYTES;
-    public final static int PKHASH_LEN = Integer.BYTES;
-    public final static int PKSZ_LEN = Integer.BYTES;
-    private final static int RS_PARTITION_LEN = Integer.BYTES;
-    private final static int RSID_LEN = Long.BYTES;
-    private final static int LOGRCD_SZ_LEN = Integer.BYTES;
-    private final static int FLDCNT_LEN = Integer.BYTES;
-    private final static int NEWOP_LEN = Byte.BYTES;
-    private final static int NEWVALSZ_LEN = Integer.BYTES;
-    private final static int CHKSUM_LEN = Long.BYTES;
-
-    private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
-    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
-            + PKSZ_LEN;
-    private final static int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-    private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
 
     private void writeLogRecordCommonFields(ByteBuffer buffer) {
         buffer.put(logSource);
@@ -173,39 +170,15 @@
         buffer.putLong(checksum);
     }
 
-    // this method is used when replication is enabled to include the log record LSN in the serialized version
     @Override
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
-        int beginOffset = buffer.position();
+    public void writeRemoteLogRecord(ByteBuffer buffer) {
         writeLogRecordCommonFields(buffer);
-
-        if (replicated) {
-            //copy the serialized log to send it to replicas
-            int serializedLogSize = getSerializedLogSize();
-            if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
-                serializedLog = ByteBuffer.allocate(serializedLogSize);
-            } else {
-                serializedLog.clear();
-            }
-
-            int currentPosition = buffer.position();
-            int currentLogSize = (currentPosition - beginOffset);
-
-            buffer.position(beginOffset);
-            buffer.get(serializedLog.array(), 0, currentLogSize);
-            serializedLog.position(currentLogSize);
-            if (logType == LogType.FLUSH) {
-                serializedLog.putLong(appendLSN);
-                serializedLog.putInt(numOfFlushedIndexes);
-                serializedLog.putInt(nodeId.length());
-                serializedLog.put(nodeId.getBytes());
-            }
-            serializedLog.flip();
-            buffer.position(currentPosition);
+        if (logType == LogType.FLUSH) {
+            buffer.putLong(LSN);
+            buffer.putInt(numOfFlushedIndexes);
+            buffer.putInt(nodeId.length());
+            buffer.put(nodeId.getBytes());
         }
-
-        checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
-        buffer.putLong(checksum);
     }
 
     private void writePKValue(ByteBuffer buffer) {
@@ -221,8 +194,12 @@
     }
 
     private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
-        tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
-        // writeTuple() doesn't change the position of the buffer.
+        if (logSource == LogSource.LOCAL) {
+            tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+        } else {
+            //since the tuple is already serialized in remote logs, just copy it from beginning to end.
+            System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), buffer.position(), size);
+        }
         buffer.position(buffer.position() + size);
     }
 
@@ -323,47 +300,19 @@
     }
 
     @Override
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog) {
-        int beginOffset = buffer.position();
-
+    public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
-        RecordReadStatus status = readLogCommonFields(buffer);
-        if (status != RecordReadStatus.OK) {
-            buffer.position(beginOffset);
-            return status;
-        }
+        readLogCommonFields(buffer);
 
         if (logType == LogType.FLUSH) {
-            if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
-                LSN = buffer.getLong();
-                numOfFlushedIndexes = buffer.getInt();
-                //read serialized node id
-                int nodeIdLength = buffer.getInt();
-                if (buffer.remaining() >= nodeIdLength) {
-                    byte[] nodeIdBytes = new byte[nodeIdLength];
-                    buffer.get(nodeIdBytes);
-                    nodeId = new String(nodeIdBytes);
-                } else {
-                    buffer.position(beginOffset);
-                    return RecordReadStatus.TRUNCATED;
-                }
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
+            LSN = buffer.getLong();
+            numOfFlushedIndexes = buffer.getInt();
+            //read serialized node id
+            int nodeIdLength = buffer.getInt();
+            byte[] nodeIdBytes = new byte[nodeIdLength];
+            buffer.get(nodeIdBytes);
+            nodeId = new String(nodeIdBytes);
         }
-
-        //remote recovery logs need to have the LSN to check which should be replayed
-        if (remoteRecoveryLog) {
-            if (buffer.remaining() >= Long.BYTES) {
-                LSN = buffer.getLong();
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
-        }
-
-        return RecordReadStatus.OK;
     }
 
     private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -443,16 +392,6 @@
             builder.append(" ResourceId : ").append(resourceId);
         }
         return builder.toString();
-    }
-
-    @Override
-    public int writeRemoteRecoveryLog(ByteBuffer buffer) {
-        int bufferBegin = buffer.position();
-        writeLogRecordCommonFields(buffer);
-        //FLUSH logs should not included in remote recovery
-        //LSN must be included in all remote recovery logs
-        buffer.putLong(LSN);
-        return buffer.position() - bufferBegin;
     }
 
     ////////////////////////////////////////////
@@ -535,18 +474,18 @@
     }
 
     @Override
-    public int getSerializedLogSize() {
-        int serilizedSize = logSize;
+    public int getRemoteLogSize() {
+        int remoteLogSize = logSize;
         if (logType == LogType.FLUSH) {
             //LSN
-            serilizedSize += Long.BYTES;
+            remoteLogSize += Long.BYTES;
             //num of indexes
-            serilizedSize += Integer.BYTES;
+            remoteLogSize += Integer.BYTES;
             //serialized node id String
-            serilizedSize += Integer.BYTES + nodeId.length();
+            remoteLogSize += Integer.BYTES + nodeId.length();
         }
-        serilizedSize -= CHKSUM_LEN;
-        return serilizedSize;
+        remoteLogSize -= CHKSUM_LEN;
+        return remoteLogSize;
     }
 
     @Override
@@ -628,15 +567,6 @@
 
     public PrimaryIndexOperationTracker getOpTracker() {
         return opTracker;
-    }
-
-    @Override
-    public ByteBuffer getSerializedLog() {
-        return serializedLog;
-    }
-
-    public void setSerializedLog(ByteBuffer serializedLog) {
-        this.serializedLog = serializedLog;
     }
 
     @Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 588968c..6afba79 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -18,53 +18,42 @@
  */
 package org.apache.asterix.replication.logging;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.management.ReplicationManager;
 
 public class ReplicationLogBuffer {
     private final int logBufferSize;
     private final AtomicBoolean full;
     private int appendOffset;
-    private int flushOffset;
+    private int replicationOffset;
     private final ByteBuffer appendBuffer;
-    private final ByteBuffer flushBuffer;
+    private final ByteBuffer replicationBuffer;
     private boolean stop;
-    private Map<String, SocketChannel> replicaSockets;
     private ReplicationManager replicationManager;
+    private final int batchSize;
 
-    public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize) {
+    public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize, int batchSize) {
         this.replicationManager = replicationManager;
         this.logBufferSize = logBufferSize;
+        this.batchSize = batchSize;
         appendBuffer = ByteBuffer.allocate(logBufferSize);
-        flushBuffer = appendBuffer.duplicate();
+        replicationBuffer = appendBuffer.duplicate();
         full = new AtomicBoolean(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
     }
 
     public void append(ILogRecord logRecord) {
-        appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
-        appendBuffer.putInt(logRecord.getSerializedLogSize());
-        appendBuffer.put(logRecord.getSerializedLog());
+        appendBuffer.putInt(logRecord.getRemoteLogSize());
+        logRecord.writeRemoteLogRecord(appendBuffer);
 
         synchronized (this) {
             appendOffset += getLogReplicationSize(logRecord);
             this.notify();
         }
-    }
-
-    public void setReplicationSockets(Map<String, SocketChannel> replicaSockets) {
-        this.replicaSockets = replicaSockets;
     }
 
     public synchronized void isFull(boolean full) {
@@ -77,18 +66,18 @@
     }
 
     private static int getLogReplicationSize(ILogRecord logRecord) {
-        //request type + request length + serialized log length
-        return Integer.BYTES + Integer.BYTES + logRecord.getSerializedLogSize();
+        //log length (4 bytes) + remote log size
+        return Integer.BYTES + logRecord.getRemoteLogSize();
     }
 
     public void reset() {
         appendBuffer.position(0);
         appendBuffer.limit(logBufferSize);
-        flushBuffer.position(0);
-        flushBuffer.limit(logBufferSize);
+        replicationBuffer.position(0);
+        replicationBuffer.limit(logBufferSize);
         full.set(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
         stop = false;
     }
 
@@ -96,7 +85,7 @@
         int endOffset;
         while (!full.get()) {
             synchronized (this) {
-                if (appendOffset - flushOffset == 0 && !full.get()) {
+                if (appendOffset - replicationOffset == 0 && !full.get()) {
                     try {
                         if (stop) {
                             break;
@@ -108,45 +97,51 @@
                 }
                 endOffset = appendOffset;
             }
-            internalFlush(flushOffset, endOffset);
+            internalFlush(replicationOffset, endOffset);
         }
-
-        internalFlush(flushOffset, appendOffset);
+        internalFlush(replicationOffset, appendOffset);
     }
 
     private void internalFlush(int beginOffset, int endOffset) {
         if (endOffset > beginOffset) {
-            int begingPos = flushBuffer.position();
-            flushBuffer.limit(endOffset);
-            sendRequest(replicaSockets, flushBuffer);
-            flushBuffer.position(begingPos + (endOffset - beginOffset));
-            flushOffset = endOffset;
+            int begingPos = replicationBuffer.position();
+            replicationBuffer.limit(endOffset);
+            transferBuffer(replicationBuffer);
+            replicationBuffer.position(begingPos + (endOffset - beginOffset));
+            replicationOffset = endOffset;
         }
     }
 
-    private void sendRequest(Map<String, SocketChannel> replicaSockets, ByteBuffer requestBuffer) {
-        Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
-        int begin = requestBuffer.position();
-        while (iterator.hasNext()) {
-            Entry<String, SocketChannel> replicaSocket = iterator.next();
-            SocketChannel clientSocket = replicaSocket.getValue();
-            try {
-                NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
-            } catch (IOException e) {
-                if (clientSocket.isOpen()) {
-                    try {
-                        clientSocket.close();
-                    } catch (IOException e2) {
-                        e2.printStackTrace();
+    private void transferBuffer(ByteBuffer buffer) {
+        if (buffer.remaining() <= batchSize) {
+            replicationManager.replicateTxnLogBatch(buffer);
+        } else {
+            int totalTransferLimit = buffer.limit();
+            while (buffer.hasRemaining()) {
+                if (buffer.remaining() > batchSize) {
+                    /**
+                     * break into smaller batches
+                     */
+                    //mark the beginning of this batch
+                    buffer.mark();
+                    int currentBatchSize = 0;
+                    while (currentBatchSize < batchSize) {
+                        int logSize = replicationBuffer.getInt();
+                        //add the size of the log record itself + 4 bytes for its size 
+                        currentBatchSize += logSize + Integer.BYTES;
+                        //go to the beginning of the next log
+                        buffer.position(buffer.position() + logSize);
                     }
+                    //set the limit to the end of this batch
+                    buffer.limit(buffer.position());
+                    //return to the beginning of the batch position
+                    buffer.reset();
                 }
-                replicationManager.reportFailedReplica(replicaSocket.getKey());
-                iterator.remove();
-            } finally {
-                requestBuffer.position(begin);
+                replicationManager.replicateTxnLogBatch(buffer);
+                //return the original limit to check the new remaining size
+                buffer.limit(totalTransferLimit);
             }
         }
-
     }
 
     public boolean isStop() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
similarity index 83%
rename from asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
rename to asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
index 3312cb1..d9d60d6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
@@ -21,24 +21,22 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
 import java.util.logging.Logger;
-
-import org.apache.asterix.common.transactions.LogRecord;
 
 /**
  * This class is responsible for sending transactions logs to remote replicas.
  */
-public class ReplicationLogFlusher implements Callable<Boolean> {
-    private final static Logger LOGGER = Logger.getLogger(ReplicationLogFlusher.class.getName());
-    private final static ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null,
-            LogRecord.JOB_TERMINATE_LOG_SIZE);
+public class TxnLogReplicator implements Callable<Boolean> {
+    private static final Logger LOGGER = Logger.getLogger(TxnLogReplicator.class.getName());
+    private static final ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null, 0, 0);
     private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ;
     private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ;
     private ReplicationLogBuffer flushPage;
     private final AtomicBoolean isStarted;
     private final AtomicBoolean terminateFlag;
 
-    public ReplicationLogFlusher(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
+    public TxnLogReplicator(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
             LinkedBlockingQueue<ReplicationLogBuffer> flushQ) {
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
@@ -74,7 +72,7 @@
 
     @Override
     public Boolean call() {
-        Thread.currentThread().setName("Replication Log Flusher");
+        Thread.currentThread().setName("TxnLog Replicator");
         synchronized (isStarted) {
             isStarted.set(true);
             isStarted.notify();
@@ -91,6 +89,7 @@
                     if (flushPage == null) {
                         continue;
                     }
+                    Thread.currentThread().interrupt();
                 }
                 flushPage.flush();
                 // TODO: pool large pages
@@ -99,8 +98,9 @@
                 }
             }
         } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.severe("ReplicationLogFlusher is terminating abnormally. Logs Replication Stopped.");
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, "TxnLogReplicator is terminating abnormally. Logs Replication Stopped.", e);
+            }
             throw e;
         }
     }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 6023cb1..bc7451f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -21,6 +21,7 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
@@ -88,7 +89,8 @@
         return hostName;
     }
 
-    public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer) throws IOException {
+    public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer)
+            throws IOException {
         while (requestBuffer.hasRemaining()) {
             socketChannel.write(requestBuffer);
         }
@@ -107,4 +109,10 @@
         long fileSize = fileChannel.size();
         fileChannel.transferFrom(socketChannel, pos, fileSize);
     }
+
+    public static InetSocketAddress getSocketAddress(SocketChannel socketChannel) {
+        String hostAddress = socketChannel.socket().getInetAddress().getHostAddress();
+        int port = socketChannel.socket().getPort();
+        return InetSocketAddress.createUnresolved(hostAddress, port);
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index a152f6c..4eeaede 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -414,46 +414,62 @@
         }
 
         private void handleLogReplication() throws IOException, ACIDException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            //set initial buffer size to a log buffer page size
+            inBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
+            while (true) {
+                //read a batch of logs
+                inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+                //check if it is end of handshake (a single byte log)
+                if (inBuffer.remaining() == 1) {
+                    break;
+                }
 
-            //Deserialize log
-            remoteLog.readRemoteLog(inBuffer, false);
-            remoteLog.setLogSource(LogSource.REMOTE);
+                /**
+                 * Process logs batch
+                 */
+                while (inBuffer.hasRemaining()) {
+                    //get rid of log size
+                    inBuffer.getInt();
+                    //Deserialize log
+                    remoteLog.readRemoteLog(inBuffer);
+                    remoteLog.setLogSource(LogSource.REMOTE);
 
-            switch (remoteLog.getLogType()) {
-                case LogType.UPDATE:
-                case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
-                    //if the log partition belongs to a partitions hosted on this node, replicate it
-                    if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
-                        logManager.log(remoteLog);
+                    switch (remoteLog.getLogType()) {
+                        case LogType.UPDATE:
+                        case LogType.ENTITY_COMMIT:
+                        case LogType.UPSERT_ENTITY_COMMIT:
+                            //if the log partition belongs to a partitions hosted on this node, replicate it
+                            if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+                                logManager.log(remoteLog);
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                        case LogType.ABORT:
+                            LogRecord jobTerminationLog = new LogRecord();
+                            TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
+                                    remoteLog.getLogType() == LogType.JOB_COMMIT);
+                            jobTerminationLog.setReplicationThread(this);
+                            jobTerminationLog.setLogSource(LogSource.REMOTE);
+                            logManager.log(jobTerminationLog);
+                            break;
+                        case LogType.FLUSH:
+                            //store mapping information for flush logs to use them in incoming LSM components.
+                            RemoteLogMapping flushLogMap = new RemoteLogMapping();
+                            flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+                            flushLogMap.setRemoteLSN(remoteLog.getLSN());
+                            logManager.log(remoteLog);
+                            //the log LSN value is updated by logManager.log(.) to a local value
+                            flushLogMap.setLocalLSN(remoteLog.getLSN());
+                            flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+                            replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                            synchronized (flushLogslock) {
+                                flushLogslock.notify();
+                            }
+                            break;
+                        default:
+                            LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
                     }
-                    break;
-                case LogType.JOB_COMMIT:
-                case LogType.ABORT:
-                    LogRecord jobTerminationLog = new LogRecord();
-                    TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
-                            remoteLog.getLogType() == LogType.JOB_COMMIT);
-                    jobTerminationLog.setReplicationThread(this);
-                    jobTerminationLog.setLogSource(LogSource.REMOTE);
-                    logManager.log(jobTerminationLog);
-                    break;
-                case LogType.FLUSH:
-                    //store mapping information for flush logs to use them in incoming LSM components.
-                    RemoteLogMapping flushLogMap = new RemoteLogMapping();
-                    flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
-                    flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                    logManager.log(remoteLog);
-                    //the log LSN value is updated by logManager.log(.) to a local value
-                    flushLogMap.setLocalLSN(remoteLog.getLSN());
-                    flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                    replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
-                    synchronized (flushLogslock) {
-                        flushLogslock.notify();
-                    }
-                    break;
-                default:
-                    LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
+                }
             }
         }
 
@@ -594,4 +610,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index ee872a5..fc1956d 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -71,7 +71,7 @@
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
-import org.apache.asterix.replication.logging.ReplicationLogFlusher;
+import org.apache.asterix.replication.logging.TxnLogReplicator;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
@@ -86,6 +86,8 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 /**
  * This class is used to process replication jobs and maintain remote replicas states
@@ -93,7 +95,7 @@
 public class ReplicationManager implements IReplicationManager {
 
     private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName());
-    private final int INITIAL_REPLICATION_FACTOR = 1;
+    private static final int INITIAL_REPLICATION_FACTOR = 1;
     private final String nodeId;
     private ExecutorService replicationListenerThreads;
     private final Map<Integer, Set<String>> jobCommitAcks;
@@ -114,7 +116,7 @@
     private final AtomicBoolean replicationSuspended;
     private AtomicBoolean terminateJobsReplication;
     private AtomicBoolean jobsReplicationSuspended;
-    private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
+    private static final int INITIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
     private final Set<String> shuttingDownReplicaIds;
     //replication threads
     private ReplicationJobsProccessor replicationJobsProcessor;
@@ -128,9 +130,10 @@
     private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
     private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
     protected ReplicationLogBuffer currentTxnLogBuffer;
-    private ReplicationLogFlusher txnlogsReplicator;
+    private TxnLogReplicator txnlogReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
-    private Map<String, SocketChannel> logsReplicaSockets = null;
+    private SocketChannel[] logsRepSockets;
+    private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
 
     //TODO this class needs to be refactored by moving its private classes to separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -179,13 +182,14 @@
             clientPartitonsSet.addAll(clientPartitions);
             replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
         }
-        int numLogBuffers = logManager.getNumLogPages();
+        int numLogBuffers = replicationProperties.getLogBufferNumOfPages();
         emptyLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
         pendingFlushLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
 
-        int logBufferSize = logManager.getLogPageSize();
+        int logBufferSize = replicationProperties.getLogBufferPageSize();
         for (int i = 0; i < numLogBuffers; i++) {
-            emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, logBufferSize));
+            emptyLogBuffersQ
+                    .offer(new ReplicationLogBuffer(this, logBufferSize, replicationProperties.getLogBatchSize()));
         }
     }
 
@@ -209,16 +213,12 @@
     }
 
     @Override
-    public void replicateLog(ILogRecord logRecord) {
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException {
         if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
             //if replication is suspended, wait until it is resumed.
             while (replicationSuspended.get()) {
                 synchronized (replicationSuspended) {
-                    try {
-                        replicationSuspended.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
-                    }
+                    replicationSuspended.wait();
                 }
             }
             Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>());
@@ -232,29 +232,23 @@
     protected void getAndInitNewLargePage(int pageSize) {
         // for now, alloc a new buffer for each large page
         // TODO: consider pooling large pages
-        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize);
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
+        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, replicationProperties.getLogBufferPageSize());
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    protected void getAndInitNewPage() {
+    protected void getAndInitNewPage() throws InterruptedException {
         currentTxnLogBuffer = null;
         while (currentTxnLogBuffer == null) {
-            try {
-                currentTxnLogBuffer = emptyLogBuffersQ.take();
-            } catch (InterruptedException e) {
-                //ignore
-            }
+            currentTxnLogBuffer = emptyLogBuffersQ.take();
         }
         currentTxnLogBuffer.reset();
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    private synchronized void appendToLogBuffer(ILogRecord logRecord) {
+    private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException {
         if (!currentTxnLogBuffer.hasSpace(logRecord)) {
             currentTxnLogBuffer.isFull(true);
-            if (logRecord.getLogSize() > logManager.getLogPageSize()) {
+            if (logRecord.getLogSize() > getLogPageSize()) {
                 getAndInitNewLargePage(logRecord.getLogSize());
             } else {
                 getAndInitNewPage();
@@ -319,6 +313,7 @@
                         }
 
                         LOGGER.log(Level.INFO, "Replicating file: " + filePath);
+
                         //open file for reading
                         try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
                                 FileChannel fileChannel = fromFile.getChannel();) {
@@ -331,6 +326,7 @@
                                         .getComponentsToBeReplicated().get(0);
                                 long LSNByteOffset = AsterixLSMIndexUtil.getComponentFileLSNOffset(
                                         (AbstractLSMIndex) LSMComponentJob.getLSMIndex(), diskComponent, filePath);
+                                //                                System.out.println("LSN BYTE OFFSET: " + LSNByteOffset);
                                 asterixFileProperties.initialize(filePath, fileSize, nodeId, isLSMComponentFile,
                                         LSNByteOffset, remainingFiles == 0);
                             } else {
@@ -458,7 +454,7 @@
     }
 
     /**
-     * Suspends proccessing replication jobs.
+     * Suspends processing replication jobs/logs.
      *
      * @param force
      *            a flag indicates if replication should be suspended right away or when the pending jobs are completed.
@@ -484,7 +480,7 @@
         }
 
         //suspend logs replication
-        if (txnlogsReplicator != null) {
+        if (txnlogReplicator != null) {
             terminateTxnLogsReplicator();
         }
     }
@@ -492,45 +488,118 @@
     /**
      * Opens a new connection with Active remote replicas and starts a listen thread per connection.
      */
-    private void establishTxnLogsReplicationConnection() {
-        logsReplicaSockets = getActiveRemoteReplicasSockets();
+    private void establishTxnLogsRepHandshake() {
+        Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets();
+        logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
+        int i = 0;
         //start a listener thread per connection
-        for (Entry<String, SocketChannel> entry : logsReplicaSockets.entrySet()) {
+        for (Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet()) {
+            logsRepSockets[i] = entry.getValue();
             replicationListenerThreads
                     .execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
+            i++;
+        }
+
+        /**
+         * establish log replication handshake
+         */
+        ByteBuffer handshakeBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE)
+                .putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+        handshakeBuffer.flip();
+        //send handshake request
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, handshakeBuffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                handshakeBuffer.position(0);
+            }
         }
     }
 
+    private void handleReplicationFailure(SocketChannel socketChannel, Throwable t) {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.log(Level.WARNING, "Could not complete replication request.", t);
+        }
+        if (socketChannel.isOpen()) {
+            try {
+                socketChannel.close();
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Could not close socket.", e);
+                }
+            }
+        }
+        reportFailedReplica(getReplicaIdBySocket(socketChannel));
+    }
+
     /**
-     * Stops ReplicationFlusherThread and closes the sockets used to replicate logs.
+     * Stops TxnLogReplicator and closes the sockets used to replicate logs.
      */
     private void terminateTxnLogsReplicator() {
-        LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ...");
-        txnlogsReplicator.terminate();
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Terminating TxnLogReplicator thread ...");
+        }
+        txnlogReplicator.terminate();
         try {
             txnLogReplicatorTask.get();
         } catch (ExecutionException | InterruptedException e) {
-            LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated abnormally");
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, "TxnLogReplicator thread terminated abnormally", e);
+            }
         }
-        LOGGER.log(Level.INFO, "LogFlusher thread is terminated.");
 
-        if (logsReplicaSockets != null) {
-            //wait for any ACK to arrive before closing sockets.
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TxnLogReplicator thread was terminated.");
+        }
+
+        /**
+         * End log replication handshake (by sending a dummy log with a single byte)
+         */
+        ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 1).putInt(1).put((byte) 0);
+        endLogRepHandshake.flip();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, endLogRepHandshake);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                endLogRepHandshake.position(0);
+            }
+        }
+
+        //wait for any ACK to arrive before closing sockets.
+        if (logsRepSockets != null) {
             synchronized (jobCommitAcks) {
-                while (jobCommitAcks.size() != 0) {
-                    try {
+                try {
+                    while (jobCommitAcks.size() != 0) {
                         jobCommitAcks.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
                     }
+                } catch (InterruptedException e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.log(Level.SEVERE, "Interrupted while waiting for jobs ACK", e);
+                    }
+                    Thread.currentThread().interrupt();
                 }
             }
-
-            //close log replication sockets
-            closeReplicaSockets(logsReplicaSockets);
-            logsReplicaSockets = null;
         }
+
+        /**
+         * Close log replication sockets
+         */
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send batch size
+                NetworkingUtil.transferBufferToChannel(replicaSocket, goodbyeBuffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                goodbyeBuffer.position(0);
+            }
+        }
+        logsRepSockets = null;
     }
 
     /**
@@ -651,8 +720,10 @@
      *            The new state of the replica.
      * @param suspendReplication
      *            a flag indicating whether to suspend replication on state change or not.
+     * @throws InterruptedException
      */
-    public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication) {
+    public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication)
+            throws InterruptedException {
         Replica replica = replicas.get(replicaId);
 
         if (replica.getState() == newState) {
@@ -702,22 +773,24 @@
      *            The remote replica id the ACK received from.
      */
     private void addAckToJob(int jobId, String replicaId) {
-        //add ACK to the job
-        if (jobCommitAcks.containsKey(jobId)) {
-            Set<String> replicaIds = jobCommitAcks.get(jobId);
-            replicaIds.add(replicaId);
-        } else {
-            throw new IllegalStateException("Job ID not found in pending job commits  " + jobId);
-        }
+        synchronized (jobCommitAcks) {
+            //add ACK to the job
+            if (jobCommitAcks.containsKey(jobId)) {
+                Set<String> replicaIds = jobCommitAcks.get(jobId);
+                replicaIds.add(replicaId);
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Invalid job replication ACK received for jobId(" + jobId + ")");
+                }
+                return;
+            }
 
-        //if got ACKs from all remote replicas, notify pending jobs if any
-        if (jobCommitAcks.get(jobId).size() == replicationFactor) {
-            synchronized (replicationJobsPendingAcks) {
-                if (replicationJobsPendingAcks.containsKey(jobId)) {
-                    ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
-                    synchronized (pendingLog) {
-                        pendingLog.notify();
-                    }
+            //if got ACKs from all remote replicas, notify pending jobs if any
+
+            if (jobCommitAcks.get(jobId).size() == replicationFactor && replicationJobsPendingAcks.containsKey(jobId)) {
+                ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
+                synchronized (pendingLog) {
+                    pendingLog.notify();
                 }
             }
         }
@@ -725,26 +798,25 @@
 
     @Override
     public boolean hasBeenReplicated(ILogRecord logRecord) {
-        if (jobCommitAcks.containsKey(logRecord.getJobId())) {
-            //check if all ACKs have been received
-            if (jobCommitAcks.get(logRecord.getJobId()).size() == replicationFactor) {
-                jobCommitAcks.remove(logRecord.getJobId());
+        int jobId = logRecord.getJobId();
+        if (jobCommitAcks.containsKey(jobId)) {
+            synchronized (jobCommitAcks) {
+                //check if all ACKs have been received
+                if (jobCommitAcks.get(jobId).size() == replicationFactor) {
+                    jobCommitAcks.remove(jobId);
 
-                if (replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
-                    replicationJobsPendingAcks.remove(logRecord);
-                }
+                    //remove from pending jobs if exists
+                    replicationJobsPendingAcks.remove(jobId);
 
-                //notify any threads waiting for all jobs to finish
-                if (jobCommitAcks.size() == 0) {
-                    synchronized (jobCommitAcks) {
+                    //notify any threads waiting for all jobs to finish
+                    if (jobCommitAcks.size() == 0) {
                         jobCommitAcks.notifyAll();
                     }
+                    return true;
+                } else {
+                    replicationJobsPendingAcks.putIfAbsent(jobId, logRecord);
+                    return false;
                 }
-
-                return true;
-            } else {
-                replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord);
-                return false;
             }
         }
 
@@ -776,7 +848,7 @@
      * @throws IOException
      */
     private SocketChannel getReplicaSocket(String replicaId) throws IOException {
-        Replica replica = replicas.get(replicaId);
+        Replica replica = replicationProperties.getReplicaById(replicaId);
         SocketChannel sc = SocketChannel.open();
         sc.configureBlocking(true);
         InetSocketAddress address = replica.getAddress(replicationProperties);
@@ -854,9 +926,7 @@
 
     @Override
     public void reportReplicaEvent(ReplicaEvent event) {
-        synchronized (replicaEventsQ) {
-            replicaEventsQ.offer(event);
-        }
+        replicaEventsQ.offer(event);
     }
 
     /**
@@ -867,6 +937,9 @@
      */
     public void reportFailedReplica(String replicaId) {
         Replica replica = replicas.get(replicaId);
+        if (replica == null) {
+            return;
+        }
         if (replica.getState() == ReplicaState.DEAD) {
             return;
         }
@@ -878,16 +951,28 @@
         reportReplicaEvent(event);
     }
 
+    private String getReplicaIdBySocket(SocketChannel socketChannel) {
+        InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel);
+        for (Replica replica : replicas.values()) {
+            InetSocketAddress replicaAddress = replica.getAddress(replicationProperties);
+            if (replicaAddress.getHostName().equals(socketAddress.getHostName())
+                    && replicaAddress.getPort() == socketAddress.getPort()) {
+                return replica.getId();
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void startReplicationThreads() {
+    public void startReplicationThreads() throws InterruptedException {
         replicationJobsProcessor = new ReplicationJobsProccessor();
 
         //start/continue processing jobs/logs
-        if (logsReplicaSockets == null) {
-            establishTxnLogsReplicationConnection();
+        if (logsRepSockets == null) {
+            establishTxnLogsRepHandshake();
             getAndInitNewPage();
-            txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ, pendingFlushLogBuffersQ);
-            txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator);
+            txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ);
+            txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator);
         }
 
         replicationJobsProcessor.start();
@@ -1037,7 +1122,42 @@
     }
 
     public int getLogPageSize() {
-        return logManager.getLogPageSize();
+        return replicationProperties.getLogBufferPageSize();
+    }
+
+    @Override
+    public void replicateTxnLogBatch(final ByteBuffer buffer) {
+        //if replication is suspended, wait until it is resumed
+        try {
+            while (replicationSuspended.get()) {
+                synchronized (replicationSuspended) {
+                    replicationSuspended.wait();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        //prepare the batch size buffer
+        txnLogsBatchSizeBuffer.clear();
+        txnLogsBatchSizeBuffer.putInt(buffer.remaining());
+        txnLogsBatchSizeBuffer.flip();
+
+        buffer.mark();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send batch size
+                NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
+                //send log
+                NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                txnLogsBatchSizeBuffer.position(0);
+                buffer.reset();
+            }
+        }
+        //move the buffer position to the sent limit 
+        buffer.position(buffer.limit());
     }
 
     //supporting classes
@@ -1068,12 +1188,12 @@
                             break;
                     }
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
                 }
             }
         }
 
-        public void handleReplicaFailure(String replicaId) {
+        public void handleReplicaFailure(String replicaId) throws InterruptedException {
             Replica replica = replicas.get(replicaId);
 
             if (replica.getState() == ReplicaState.DEAD) {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 47e60b2..d14a0f7 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -204,7 +204,7 @@
     }
 
     @Override
-    public void completeFailbackProcess() throws IOException {
+    public void completeFailbackProcess() throws IOException, InterruptedException {
         ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
         ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext
                 .getReplicaResourcesManager();
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 872adcd..e9d8a8c 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -113,7 +113,7 @@
 
     @Override
     public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
-        logRecord.writeLogRecord(appendBuffer, appendLSN);
+        logRecord.writeLogRecord(appendBuffer);
 
         if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
                 && logRecord.getLogType() != LogType.WAIT) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index cacd036..8e615af 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -37,10 +37,6 @@
 
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
-        if (logRecord.getLogSize() > logPageSize) {
-            throw new IllegalStateException();
-        }
-
         //only locally generated logs should be replicated
         logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT);
 
@@ -58,7 +54,11 @@
         syncAppendToLogTail(logRecord);
 
         if (logRecord.isReplicated()) {
-            replicationManager.replicateLog(logRecord);
+            try {
+                replicationManager.replicateLog(logRecord);
+            } catch (InterruptedException e) {
+                throw new ACIDException(e);
+            }
         }
 
         if (logRecord.getLogSource() == LogSource.LOCAL) {
@@ -69,7 +69,7 @@
                         try {
                             logRecord.wait();
                         } catch (InterruptedException e) {
-                            //ignore
+                            Thread.currentThread().interrupt();
                         }
                     }
 
@@ -79,7 +79,7 @@
                             try {
                                 logRecord.wait();
                             } catch (InterruptedException e) {
-                                //ignore
+                                Thread.currentThread().interrupt();
                             }
                         }
                     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/883

to look at the new patch set (#4).

Change subject: Txn Log Replication Optimizations
......................................................................

Txn Log Replication Optimizations

- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.
- Some SONAR fixes for old code.

Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
R asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
16 files changed, 653 insertions(+), 522 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/83/883/4
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/883

to look at the new patch set (#2).

Change subject: Txn Log Replication Optimizations
......................................................................

Txn Log Replication Optimizations

- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.
- Some SONAR fixes for old code.

Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
R asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
16 files changed, 643 insertions(+), 518 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/83/883/2
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Txn Log Replication Optimizations
......................................................................


Patch Set 3:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1491/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Txn Log Replication Optimizations
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1485/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/883

to look at the new patch set (#3).

Change subject: Txn Log Replication Optimizations
......................................................................

Txn Log Replication Optimizations

- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.
- Some SONAR fixes for old code.

Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
R asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
16 files changed, 649 insertions(+), 520 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/83/883/3
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Txn Log Replication Optimizations
......................................................................


Patch Set 2:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1490/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Txn Log Replication Optimizations
......................................................................


Patch Set 4:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1545/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Murtadha Hubail has submitted this change and it was merged.

Change subject: Txn Log Replication Optimizations
......................................................................


Txn Log Replication Optimizations

- Add configurations for txn log replication.
- Establish handshake for continuous log replication.
- Replicate logs in batches instead of one by one.
- Eliminate HashMap iterator object creation per logs batch.
- Fix synchronization for jobs' ACKs from remote replicas.
- Bug fixes for big objects replication.
- Some SONAR fixes for old code.

Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Reviewed-on: https://asterix-gerrit.ics.uci.edu/883
Reviewed-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
R asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
16 files changed, 653 insertions(+), 522 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Looks good to me, but someone else must approve; Verified



diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 0a6d62d..c71d77e 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -150,7 +150,7 @@
         }
     }
 
-    private void startReplicationService() {
+    private void startReplicationService() throws InterruptedException {
         //Open replication channel
         runtimeContext.getReplicationChannel().start();
 
diff --git a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index b1d0649..1d693f9 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -98,10 +98,4 @@
     <description>Enabling plot of Algebricks plan to tmp folder. (Default = false)
     </description>
   </property>
-  <property>
-    <name>log.level</name>
-    <value>WARNING</value>
-    <description>The minimum log level to be displayed. (Default = INFO)
-    </description>
-  </property>
 </asterixConfiguration>
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 5d31d9a..7f51bbd 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -26,18 +26,30 @@
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 public class AsterixReplicationProperties extends AbstractAsterixProperties {
 
     private static final Logger LOGGER = Logger.getLogger(AsterixReplicationProperties.class.getName());
 
-    private static int REPLICATION_DATAPORT_DEFAULT = 2000;
-    private static int REPLICATION_FACTOR_DEFAULT = 1;
-    private static int REPLICATION_TIME_OUT_DEFAULT = 15;
+    private static final int REPLICATION_DATAPORT_DEFAULT = 2000;
+    private static final int REPLICATION_FACTOR_DEFAULT = 1;
+    private static final int REPLICATION_TIME_OUT_DEFAULT = 15;
     private static final int MAX_REMOTE_RECOVERY_ATTEMPTS = 5;
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
+
+    private static final String REPLICATION_LOG_BATCH_SIZE_KEY = "replication.log.batchsize";
+    private static final int REPLICATION_LOG_BATCH_SIZE_DEFAULT = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+
+    private static final String REPLICATION_LOG_BUFFER_NUM_PAGES_KEY = "replication.log.buffer.numpages";
+    private static final int REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT = 8;
+
+    private static final String REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY = "replication.log.buffer.pagesize";
+    private static final int REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT = StorageUtil.getSizeInBytes(128,
+            StorageUnit.KILOBYTE);
 
     public AsterixReplicationProperties(AsterixPropertiesAccessor accessor, Cluster cluster) {
         super(accessor);
@@ -90,7 +102,7 @@
     }
 
     public Set<Replica> getRemoteReplicas(String nodeId) {
-        Set<Replica> remoteReplicas = new HashSet<Replica>();;
+        Set<Replica> remoteReplicas = new HashSet<>();;
 
         int numberOfRemoteReplicas = getReplicationFactor() - 1;
         //Using chained-declustering
@@ -161,7 +173,7 @@
     }
 
     public Set<String> getRemoteReplicasIds(String nodeId) {
-        Set<String> remoteReplicasIds = new HashSet<String>();
+        Set<String> remoteReplicasIds = new HashSet<>();
         Set<Replica> remoteReplicas = getRemoteReplicas(nodeId);
 
         for (Replica replica : remoteReplicas) {
@@ -176,7 +188,7 @@
     }
 
     public Set<String> getNodeReplicasIds(String nodeId) {
-        Set<String> replicaIds = new HashSet<String>();
+        Set<String> replicaIds = new HashSet<>();
         replicaIds.add(nodeId);
         replicaIds.addAll(getRemoteReplicasIds(nodeId));
         return replicaIds;
@@ -245,4 +257,19 @@
     public int getMaxRemoteRecoveryAttempts() {
         return MAX_REMOTE_RECOVERY_ATTEMPTS;
     }
+
+    public int getLogBufferPageSize() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_PAGE_SIZE_KEY, REPLICATION_LOG_BUFFER_PAGE_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBufferNumOfPages() {
+        return accessor.getProperty(REPLICATION_LOG_BUFFER_NUM_PAGES_KEY, REPLICATION_LOG_BUFFER_NUM_PAGES_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
+
+    public int getLogBatchSize() {
+        return accessor.getProperty(REPLICATION_LOG_BATCH_SIZE_KEY, REPLICATION_LOG_BATCH_SIZE_DEFAULT,
+                PropertyInterpreters.getIntegerPropertyInterpreter());
+    }
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index a2738e8..9f9d74b 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -43,6 +43,7 @@
      * Requests the remaining LSM disk components files from active remote replicas.
      *
      * @throws IOException
+     * @throws InterruptedException
      */
-    public void completeFailbackProcess() throws IOException;
+    public void completeFailbackProcess() throws IOException, InterruptedException;
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 755fbbd..6bd1505 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.replication;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Set;
 
 import org.apache.asterix.common.transactions.ILogRecord;
@@ -31,8 +32,9 @@
      *
      * @param logRecord
      *            The log record to be replicated,
+     * @throws InterruptedException
      */
-    public void replicateLog(ILogRecord logRecord);
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException;
 
     /**
      * Checks whether a log record has been replicated
@@ -79,8 +81,10 @@
 
     /**
      * Starts processing of ASYNC replication jobs as well as Txn logs.
+     *
+     * @throws InterruptedException
      */
-    public void startReplicationThreads();
+    public void startReplicationThreads() throws InterruptedException;
 
     /**
      * Checks and sets each remote replica state.
@@ -114,4 +118,13 @@
      * @throws IOException
      */
     public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
+
+    /**
+     * Transfers the contents of the {@code buffer} to active remote replicas.
+     * The transfer starts from the {@code buffer} current position to its limit.
+     * After the transfer, the {@code buffer} position will be its limit.
+     *
+     * @param buffer
+     */
+    public void replicateTxnLogBatch(ByteBuffer buffer);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
index 4c3f728..b8fe4b2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/Replica.java
@@ -62,8 +62,7 @@
     public InetSocketAddress getAddress(AsterixReplicationProperties asterixReplicationProperties) {
         String replicaIPAddress = node.getClusterIp();
         int replicationPort = asterixReplicationProperties.getDataReplicationPort(node.getId());
-        InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
-        return replicaAddress;
+        return InetSocketAddress.createUnresolved(replicaIPAddress, replicationPort);
     }
 
     public static Replica create(DataInput input) throws IOException {
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 3738cd1..1992a00 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -110,9 +110,7 @@
 
     public String getNodeId();
 
-    public int writeRemoteRecoveryLog(ByteBuffer buffer);
-
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog);
+    public void readRemoteLog(ByteBuffer buffer);
 
     public void setReplicationThread(IReplicationThread replicationThread);
 
@@ -120,11 +118,7 @@
 
     public byte getLogSource();
 
-    public int getSerializedLogSize();
-
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN);
-
-    public ByteBuffer getSerializedLog();
+    public int getRemoteLogSize();
 
     public void setNodeId(String nodeId);
 
@@ -138,4 +132,6 @@
      * @return a flag indicating whether the log record should be sent to remote replicas
      */
     public boolean isReplicated();
+
+    public void writeRemoteLogRecord(ByteBuffer buffer);
 }
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index fd56913..4823a92 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -29,7 +29,7 @@
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
 import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 
-/*
+/**
  * == LogRecordFormat ==
  * ---------------------------
  * [Header1] (6 bytes) : for all log types
@@ -44,8 +44,7 @@
  * PKValueSize(4)
  * PKValue(PKValueSize)
  * ---------------------------
- * [Header3] (20 bytes) : only for update log type
- * PrevLSN(8)
+ * [Header3] (12 bytes) : only for update log type
  * ResourceId(8) //stored in .metadata of the corresponding index in NC node
  * LogRecordSize(4)
  * ---------------------------
@@ -61,16 +60,34 @@
  * = LogSize =
  * 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
  * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize
- *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
+ * --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
  * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
- *    --> UPDATE_LOG_BASE_SIZE = 59
+ * --> UPDATE_LOG_BASE_SIZE = 59
  * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
  * 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
- *    --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
- *        it also includes LogSource and JobId fields.
+ * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
+ * it also includes LogSource and JobId fields.
  */
 
 public class LogRecord implements ILogRecord {
+
+    private static final int LOG_SOURCE_LEN = Byte.BYTES;
+    private static final int TYPE_LEN = Byte.BYTES;
+    public static final int PKHASH_LEN = Integer.BYTES;
+    public static final int PKSZ_LEN = Integer.BYTES;
+    private static final int RS_PARTITION_LEN = Integer.BYTES;
+    private static final int RSID_LEN = Long.BYTES;
+    private static final int LOGRCD_SZ_LEN = Integer.BYTES;
+    private static final int FLDCNT_LEN = Integer.BYTES;
+    private static final int NEWOP_LEN = Byte.BYTES;
+    private static final int NEWVALSZ_LEN = Integer.BYTES;
+    private static final int CHKSUM_LEN = Long.BYTES;
+
+    private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+    private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
+            + PKSZ_LEN;
+    private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+    private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
 
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
@@ -101,9 +118,8 @@
     private int[] PKFields;
     private PrimaryIndexOperationTracker opTracker;
     private IReplicationThread replicationThread;
-    private ByteBuffer serializedLog;
     /**
-     * The fields (numOfFlushedIndexes and nodeId) are used for serialized flush logs only
+     * The fields (numOfFlushedIndexes and nodeId) are used for remote flush logs only
      * to indicate the source of the log and how many indexes were flushed using its LSN.
      */
     private int numOfFlushedIndexes;
@@ -118,25 +134,6 @@
         checksumGen = new CRC32();
         logSource = LogSource.LOCAL;
     }
-
-    private final static int LOG_SOURCE_LEN = Byte.BYTES;
-    private final static int TYPE_LEN = Byte.BYTES;
-    public final static int PKHASH_LEN = Integer.BYTES;
-    public final static int PKSZ_LEN = Integer.BYTES;
-    private final static int RS_PARTITION_LEN = Integer.BYTES;
-    private final static int RSID_LEN = Long.BYTES;
-    private final static int LOGRCD_SZ_LEN = Integer.BYTES;
-    private final static int FLDCNT_LEN = Integer.BYTES;
-    private final static int NEWOP_LEN = Byte.BYTES;
-    private final static int NEWVALSZ_LEN = Integer.BYTES;
-    private final static int CHKSUM_LEN = Long.BYTES;
-
-    private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
-    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
-            + PKSZ_LEN;
-    private final static int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
-    private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-    private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
 
     private void writeLogRecordCommonFields(ByteBuffer buffer) {
         buffer.put(logSource);
@@ -173,39 +170,15 @@
         buffer.putLong(checksum);
     }
 
-    // this method is used when replication is enabled to include the log record LSN in the serialized version
     @Override
-    public void writeLogRecord(ByteBuffer buffer, long appendLSN) {
-        int beginOffset = buffer.position();
+    public void writeRemoteLogRecord(ByteBuffer buffer) {
         writeLogRecordCommonFields(buffer);
-
-        if (replicated) {
-            //copy the serialized log to send it to replicas
-            int serializedLogSize = getSerializedLogSize();
-            if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
-                serializedLog = ByteBuffer.allocate(serializedLogSize);
-            } else {
-                serializedLog.clear();
-            }
-
-            int currentPosition = buffer.position();
-            int currentLogSize = (currentPosition - beginOffset);
-
-            buffer.position(beginOffset);
-            buffer.get(serializedLog.array(), 0, currentLogSize);
-            serializedLog.position(currentLogSize);
-            if (logType == LogType.FLUSH) {
-                serializedLog.putLong(appendLSN);
-                serializedLog.putInt(numOfFlushedIndexes);
-                serializedLog.putInt(nodeId.length());
-                serializedLog.put(nodeId.getBytes());
-            }
-            serializedLog.flip();
-            buffer.position(currentPosition);
+        if (logType == LogType.FLUSH) {
+            buffer.putLong(LSN);
+            buffer.putInt(numOfFlushedIndexes);
+            buffer.putInt(nodeId.length());
+            buffer.put(nodeId.getBytes());
         }
-
-        checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
-        buffer.putLong(checksum);
     }
 
     private void writePKValue(ByteBuffer buffer) {
@@ -221,8 +194,12 @@
     }
 
     private void writeTuple(ByteBuffer buffer, ITupleReference tuple, int size) {
-        tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
-        // writeTuple() doesn't change the position of the buffer.
+        if (logSource == LogSource.LOCAL) {
+            tupleWriter.writeTuple(tuple, buffer.array(), buffer.position());
+        } else {
+            //since the tuple is already serialized in remote logs, just copy it from beginning to end.
+            System.arraycopy(tuple.getFieldData(0), 0, buffer.array(), buffer.position(), size);
+        }
         buffer.position(buffer.position() + size);
     }
 
@@ -323,47 +300,19 @@
     }
 
     @Override
-    public RecordReadStatus readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog) {
-        int beginOffset = buffer.position();
-
+    public void readRemoteLog(ByteBuffer buffer) {
         //read common fields
-        RecordReadStatus status = readLogCommonFields(buffer);
-        if (status != RecordReadStatus.OK) {
-            buffer.position(beginOffset);
-            return status;
-        }
+        readLogCommonFields(buffer);
 
         if (logType == LogType.FLUSH) {
-            if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
-                LSN = buffer.getLong();
-                numOfFlushedIndexes = buffer.getInt();
-                //read serialized node id
-                int nodeIdLength = buffer.getInt();
-                if (buffer.remaining() >= nodeIdLength) {
-                    byte[] nodeIdBytes = new byte[nodeIdLength];
-                    buffer.get(nodeIdBytes);
-                    nodeId = new String(nodeIdBytes);
-                } else {
-                    buffer.position(beginOffset);
-                    return RecordReadStatus.TRUNCATED;
-                }
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
+            LSN = buffer.getLong();
+            numOfFlushedIndexes = buffer.getInt();
+            //read serialized node id
+            int nodeIdLength = buffer.getInt();
+            byte[] nodeIdBytes = new byte[nodeIdLength];
+            buffer.get(nodeIdBytes);
+            nodeId = new String(nodeIdBytes);
         }
-
-        //remote recovery logs need to have the LSN to check which should be replayed
-        if (remoteRecoveryLog) {
-            if (buffer.remaining() >= Long.BYTES) {
-                LSN = buffer.getLong();
-            } else {
-                buffer.position(beginOffset);
-                return RecordReadStatus.TRUNCATED;
-            }
-        }
-
-        return RecordReadStatus.OK;
     }
 
     private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -443,16 +392,6 @@
             builder.append(" ResourceId : ").append(resourceId);
         }
         return builder.toString();
-    }
-
-    @Override
-    public int writeRemoteRecoveryLog(ByteBuffer buffer) {
-        int bufferBegin = buffer.position();
-        writeLogRecordCommonFields(buffer);
-        //FLUSH logs should not included in remote recovery
-        //LSN must be included in all remote recovery logs
-        buffer.putLong(LSN);
-        return buffer.position() - bufferBegin;
     }
 
     ////////////////////////////////////////////
@@ -535,18 +474,18 @@
     }
 
     @Override
-    public int getSerializedLogSize() {
-        int serilizedSize = logSize;
+    public int getRemoteLogSize() {
+        int remoteLogSize = logSize;
         if (logType == LogType.FLUSH) {
             //LSN
-            serilizedSize += Long.BYTES;
+            remoteLogSize += Long.BYTES;
             //num of indexes
-            serilizedSize += Integer.BYTES;
+            remoteLogSize += Integer.BYTES;
             //serialized node id String
-            serilizedSize += Integer.BYTES + nodeId.length();
+            remoteLogSize += Integer.BYTES + nodeId.length();
         }
-        serilizedSize -= CHKSUM_LEN;
-        return serilizedSize;
+        remoteLogSize -= CHKSUM_LEN;
+        return remoteLogSize;
     }
 
     @Override
@@ -628,15 +567,6 @@
 
     public PrimaryIndexOperationTracker getOpTracker() {
         return opTracker;
-    }
-
-    @Override
-    public ByteBuffer getSerializedLog() {
-        return serializedLog;
-    }
-
-    public void setSerializedLog(ByteBuffer serializedLog) {
-        this.serializedLog = serializedLog;
     }
 
     @Override
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 588968c..283f69f 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -18,53 +18,42 @@
  */
 package org.apache.asterix.replication.logging;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.management.ReplicationManager;
 
 public class ReplicationLogBuffer {
     private final int logBufferSize;
     private final AtomicBoolean full;
     private int appendOffset;
-    private int flushOffset;
+    private int replicationOffset;
     private final ByteBuffer appendBuffer;
-    private final ByteBuffer flushBuffer;
+    private final ByteBuffer replicationBuffer;
     private boolean stop;
-    private Map<String, SocketChannel> replicaSockets;
     private ReplicationManager replicationManager;
+    private final int batchSize;
 
-    public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize) {
+    public ReplicationLogBuffer(ReplicationManager replicationManager, int logBufferSize, int batchSize) {
         this.replicationManager = replicationManager;
         this.logBufferSize = logBufferSize;
+        this.batchSize = batchSize;
         appendBuffer = ByteBuffer.allocate(logBufferSize);
-        flushBuffer = appendBuffer.duplicate();
+        replicationBuffer = appendBuffer.duplicate();
         full = new AtomicBoolean(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
     }
 
     public void append(ILogRecord logRecord) {
-        appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
-        appendBuffer.putInt(logRecord.getSerializedLogSize());
-        appendBuffer.put(logRecord.getSerializedLog());
+        appendBuffer.putInt(logRecord.getRemoteLogSize());
+        logRecord.writeRemoteLogRecord(appendBuffer);
 
         synchronized (this) {
             appendOffset += getLogReplicationSize(logRecord);
             this.notify();
         }
-    }
-
-    public void setReplicationSockets(Map<String, SocketChannel> replicaSockets) {
-        this.replicaSockets = replicaSockets;
     }
 
     public synchronized void isFull(boolean full) {
@@ -77,18 +66,18 @@
     }
 
     private static int getLogReplicationSize(ILogRecord logRecord) {
-        //request type + request length + serialized log length
-        return Integer.BYTES + Integer.BYTES + logRecord.getSerializedLogSize();
+        //log length (4 bytes) + remote log size
+        return Integer.BYTES + logRecord.getRemoteLogSize();
     }
 
     public void reset() {
         appendBuffer.position(0);
         appendBuffer.limit(logBufferSize);
-        flushBuffer.position(0);
-        flushBuffer.limit(logBufferSize);
+        replicationBuffer.position(0);
+        replicationBuffer.limit(logBufferSize);
         full.set(false);
         appendOffset = 0;
-        flushOffset = 0;
+        replicationOffset = 0;
         stop = false;
     }
 
@@ -96,57 +85,66 @@
         int endOffset;
         while (!full.get()) {
             synchronized (this) {
-                if (appendOffset - flushOffset == 0 && !full.get()) {
+                if (appendOffset - replicationOffset == 0 && !full.get()) {
                     try {
                         if (stop) {
                             break;
                         }
                         this.wait();
                     } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         continue;
                     }
                 }
                 endOffset = appendOffset;
             }
-            internalFlush(flushOffset, endOffset);
+            internalFlush(replicationOffset, endOffset);
         }
-
-        internalFlush(flushOffset, appendOffset);
+        internalFlush(replicationOffset, appendOffset);
     }
 
     private void internalFlush(int beginOffset, int endOffset) {
         if (endOffset > beginOffset) {
-            int begingPos = flushBuffer.position();
-            flushBuffer.limit(endOffset);
-            sendRequest(replicaSockets, flushBuffer);
-            flushBuffer.position(begingPos + (endOffset - beginOffset));
-            flushOffset = endOffset;
+            int begingPos = replicationBuffer.position();
+            replicationBuffer.limit(endOffset);
+            transferBuffer(replicationBuffer);
+            replicationBuffer.position(begingPos + (endOffset - beginOffset));
+            replicationOffset = endOffset;
         }
     }
 
-    private void sendRequest(Map<String, SocketChannel> replicaSockets, ByteBuffer requestBuffer) {
-        Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
-        int begin = requestBuffer.position();
-        while (iterator.hasNext()) {
-            Entry<String, SocketChannel> replicaSocket = iterator.next();
-            SocketChannel clientSocket = replicaSocket.getValue();
-            try {
-                NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
-            } catch (IOException e) {
-                if (clientSocket.isOpen()) {
-                    try {
-                        clientSocket.close();
-                    } catch (IOException e2) {
-                        e2.printStackTrace();
-                    }
-                }
-                replicationManager.reportFailedReplica(replicaSocket.getKey());
-                iterator.remove();
-            } finally {
-                requestBuffer.position(begin);
-            }
+    private void transferBuffer(ByteBuffer buffer) {
+        if (buffer.remaining() <= batchSize) {
+            //the current batch can be sent as it is
+            replicationManager.replicateTxnLogBatch(buffer);
+            return;
         }
+        /**
+         * break the batch into smaller batches
+         */
+        int totalTransferLimit = buffer.limit();
+        while (buffer.hasRemaining()) {
+            if (buffer.remaining() > batchSize) {
 
+                //mark the beginning of this batch
+                buffer.mark();
+                int currentBatchSize = 0;
+                while (currentBatchSize < batchSize) {
+                    int logSize = replicationBuffer.getInt();
+                    //add the size of the log record itself + 4 bytes for its size
+                    currentBatchSize += logSize + Integer.BYTES;
+                    //go to the beginning of the next log
+                    buffer.position(buffer.position() + logSize);
+                }
+                //set the limit to the end of this batch
+                buffer.limit(buffer.position());
+                //return to the beginning of the batch position
+                buffer.reset();
+            }
+            replicationManager.replicateTxnLogBatch(buffer);
+            //return the original limit to check the new remaining size
+            buffer.limit(totalTransferLimit);
+        }
     }
 
     public boolean isStop() {
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
similarity index 71%
rename from asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
rename to asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
index 3312cb1..118fde6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogFlusher.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogReplicator.java
@@ -21,24 +21,22 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
 import java.util.logging.Logger;
-
-import org.apache.asterix.common.transactions.LogRecord;
 
 /**
  * This class is responsible for sending transactions logs to remote replicas.
  */
-public class ReplicationLogFlusher implements Callable<Boolean> {
-    private final static Logger LOGGER = Logger.getLogger(ReplicationLogFlusher.class.getName());
-    private final static ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null,
-            LogRecord.JOB_TERMINATE_LOG_SIZE);
+public class TxnLogReplicator implements Callable<Boolean> {
+    private static final Logger LOGGER = Logger.getLogger(TxnLogReplicator.class.getName());
+    private static final ReplicationLogBuffer POISON_PILL = new ReplicationLogBuffer(null, 0, 0);
     private final LinkedBlockingQueue<ReplicationLogBuffer> emptyQ;
     private final LinkedBlockingQueue<ReplicationLogBuffer> flushQ;
     private ReplicationLogBuffer flushPage;
     private final AtomicBoolean isStarted;
     private final AtomicBoolean terminateFlag;
 
-    public ReplicationLogFlusher(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
+    public TxnLogReplicator(LinkedBlockingQueue<ReplicationLogBuffer> emptyQ,
             LinkedBlockingQueue<ReplicationLogBuffer> flushQ) {
         this.emptyQ = emptyQ;
         this.flushQ = flushQ;
@@ -54,7 +52,7 @@
                 try {
                     isStarted.wait();
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
                 }
             }
         }
@@ -74,34 +72,37 @@
 
     @Override
     public Boolean call() {
-        Thread.currentThread().setName("Replication Log Flusher");
+        Thread.currentThread().setName("TxnLog Replicator");
         synchronized (isStarted) {
             isStarted.set(true);
             isStarted.notify();
         }
-        try {
-            while (true) {
+
+        while (true) {
+            try {
+                if (terminateFlag.get()) {
+                    return true;
+                }
+
                 flushPage = null;
-                try {
-                    flushPage = flushQ.take();
-                    if (flushPage == POISON_PILL || terminateFlag.get()) {
-                        return true;
-                    }
-                } catch (InterruptedException e) {
-                    if (flushPage == null) {
-                        continue;
-                    }
+                flushPage = flushQ.take();
+                if (flushPage == POISON_PILL) {
+                    continue;
                 }
                 flushPage.flush();
                 // TODO: pool large pages
                 if (flushPage.getLogBufferSize() == flushPage.getReplicationManager().getLogPageSize()) {
                     emptyQ.offer(flushPage);
                 }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                if (LOGGER.isLoggable(Level.SEVERE)) {
+                    LOGGER.log(Level.SEVERE, "TxnLogReplicator is terminating abnormally. Logs Replication Stopped.",
+                            e);
+                }
+                throw e;
             }
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.severe("ReplicationLogFlusher is terminating abnormally. Logs Replication Stopped.");
-            throw e;
         }
     }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
index 6023cb1..62c1e4a 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/NetworkingUtil.java
@@ -21,6 +21,7 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
@@ -30,6 +31,10 @@
 import java.util.Enumeration;
 
 public class NetworkingUtil {
+
+    private NetworkingUtil() {
+        throw new AssertionError("This util class should not be initialized.");
+    }
 
     public static void readBytes(SocketChannel socketChannel, ByteBuffer byteBuffer, int length) throws IOException {
         byteBuffer.clear();
@@ -88,7 +93,8 @@
         return hostName;
     }
 
-    public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer) throws IOException {
+    public static void transferBufferToChannel(SocketChannel socketChannel, ByteBuffer requestBuffer)
+            throws IOException {
         while (requestBuffer.hasRemaining()) {
             socketChannel.write(requestBuffer);
         }
@@ -107,4 +113,10 @@
         long fileSize = fileChannel.size();
         fileChannel.transferFrom(socketChannel, pos, fileSize);
     }
+
+    public static InetSocketAddress getSocketAddress(SocketChannel socketChannel) {
+        String hostAddress = socketChannel.socket().getInetAddress().getHostAddress();
+        int port = socketChannel.socket().getPort();
+        return InetSocketAddress.createUnresolved(hostAddress, port);
+    }
 }
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index a152f6c..cabfc77 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -83,6 +83,7 @@
 public class ReplicationChannel extends Thread implements IReplicationChannel {
 
     private static final Logger LOGGER = Logger.getLogger(ReplicationChannel.class.getName());
+    private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1;
     private final ExecutorService replicationThreads;
     private final String localNodeID;
     private final ILogManager logManager;
@@ -91,8 +92,8 @@
     private ServerSocketChannel serverSocketChannel = null;
     private final IReplicationManager replicationManager;
     private final AsterixReplicationProperties replicationProperties;
-    private final IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider;
-    private final static int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
+    private final IAsterixAppRuntimeContextProvider appContextProvider;
+    private static final int INTIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
     private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
     private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
     private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
@@ -110,10 +111,10 @@
         this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager;
         this.replicationManager = replicationManager;
         this.replicationProperties = replicationProperties;
-        this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
-        lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<LSMComponentLSNSyncTask>();
+        this.appContextProvider = asterixAppRuntimeContextProvider;
+        lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>();
         pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
-        lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>();
+        lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
         replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
@@ -166,16 +167,17 @@
 
         //clean up when all the LSM component files have been received.
         if (remainingFile == 0) {
-            if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null) {
-                //if this LSN wont be used for any other index, remove it
-                if (replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
-                    int remainingIndexes = replicaUniqueLSN2RemoteMapping
-                            .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
-                    if (remainingIndexes == 0) {
-                        //Note: there is a chance that this is never deleted because some index in the dataset was not flushed because it is empty.
-                        //This could be solved by passing only the number of successfully flushed indexes
-                        replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
-                    }
+            if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
+                    && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
+                int remainingIndexes = replicaUniqueLSN2RemoteMapping
+                        .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
+                if (remainingIndexes == 0) {
+                    /**
+                     * Note: there is a chance that this will never be removed because some
+                     * index in the dataset was not flushed because it is empty. This could
+                     * be solved by passing only the number of successfully flushed indexes.
+                     */
+                    replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
                 }
             }
 
@@ -242,20 +244,23 @@
                         case FLUSH_INDEX:
                             handleFlushIndex();
                             break;
-                        default: {
+                        default:
                             throw new IllegalStateException("Unknown replication request");
-                        }
                     }
                     replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 }
             } catch (Exception e) {
-                e.printStackTrace();
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Unexpectedly error during replication.", e);
+                }
             } finally {
                 if (socketChannel.isOpen()) {
                     try {
                         socketChannel.close();
                     } catch (IOException e) {
-                        e.printStackTrace();
+                        if (LOGGER.isLoggable(Level.WARNING)) {
+                            LOGGER.log(Level.WARNING, "Filed to close replication socket.", e);
+                        }
                     }
                 }
             }
@@ -263,15 +268,17 @@
 
         private void handleFlushIndex() throws IOException {
             inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            //1. read which indexes are requested to be flushed from remote replica
+            //read which indexes are requested to be flushed from remote replica
             ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
             Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
 
-            //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
-            IDatasetLifecycleManager datasetLifeCycleManager = asterixAppRuntimeContextProvider
-                    .getDatasetLifecycleManager();
+            /**
+             * check which indexes can be flushed (open indexes) and which cannot be
+             * flushed (closed or have empty memory component).
+             */
+            IDatasetLifecycleManager datasetLifeCycleManager = appContextProvider.getDatasetLifecycleManager();
             List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo();
-            Set<Integer> datasetsToForceFlush = new HashSet<Integer>();
+            Set<Integer> datasetsToForceFlush = new HashSet<>();
             for (IndexInfo iInfo : openIndexesInfo) {
                 if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
                     AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
@@ -281,7 +288,10 @@
                         //remove index to indicate that it will be flushed
                         requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
                     } else if (!((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()) {
-                        //if an index has something to be flushed, then the request to flush it will succeed and we need to schedule it to be flushed.
+                        /**
+                         * if an index has something to be flushed, then the request to flush it
+                         * will succeed and we need to schedule it to be flushed.
+                         */
                         datasetsToForceFlush.add(iInfo.getDatasetId());
                         //remove index to indicate that it will be flushed
                         requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
@@ -289,13 +299,13 @@
                 }
             }
 
-            //3. force flush datasets requested to be flushed
+            //schedule flush for datasets requested to be flushed
             for (int datasetId : datasetsToForceFlush) {
                 datasetLifeCycleManager.flushDataset(datasetId, true);
             }
 
             //the remaining indexes in the requested set are those which cannot be flushed.
-            //4. respond back to the requester that those indexes cannot be flushed
+            //respond back to the requester that those indexes cannot be flushed
             ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
             outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
             NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
@@ -363,7 +373,7 @@
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
             Set<String> requesterExistingFiles = request.getExistingFiles();
-            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider
                     .getAppContext()).getMetadataProperties().getNodePartitions();
             for (String replicaId : replicaIds) {
                 //get replica partitions
@@ -414,50 +424,70 @@
         }
 
         private void handleLogReplication() throws IOException, ACIDException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            //set initial buffer size to a log buffer page size
+            inBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
+            while (true) {
+                //read a batch of logs
+                inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+                //check if it is end of handshake (a single byte log)
+                if (inBuffer.remaining() == LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) {
+                    break;
+                }
 
-            //Deserialize log
-            remoteLog.readRemoteLog(inBuffer, false);
-            remoteLog.setLogSource(LogSource.REMOTE);
-
-            switch (remoteLog.getLogType()) {
-                case LogType.UPDATE:
-                case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
-                    //if the log partition belongs to a partitions hosted on this node, replicate it
-                    if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
-                        logManager.log(remoteLog);
-                    }
-                    break;
-                case LogType.JOB_COMMIT:
-                case LogType.ABORT:
-                    LogRecord jobTerminationLog = new LogRecord();
-                    TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
-                            remoteLog.getLogType() == LogType.JOB_COMMIT);
-                    jobTerminationLog.setReplicationThread(this);
-                    jobTerminationLog.setLogSource(LogSource.REMOTE);
-                    logManager.log(jobTerminationLog);
-                    break;
-                case LogType.FLUSH:
-                    //store mapping information for flush logs to use them in incoming LSM components.
-                    RemoteLogMapping flushLogMap = new RemoteLogMapping();
-                    flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
-                    flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                    logManager.log(remoteLog);
-                    //the log LSN value is updated by logManager.log(.) to a local value
-                    flushLogMap.setLocalLSN(remoteLog.getLSN());
-                    flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
-                    replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
-                    synchronized (flushLogslock) {
-                        flushLogslock.notify();
-                    }
-                    break;
-                default:
-                    LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
+                processLogsBatch(inBuffer);
             }
         }
 
-        //this method is called sequentially by LogPage (notifyReplicationTerminator) for JOB_COMMIT and JOB_ABORT log types.
+        private void processLogsBatch(ByteBuffer buffer) throws ACIDException {
+            while (buffer.hasRemaining()) {
+                //get rid of log size
+                inBuffer.getInt();
+                //Deserialize log
+                remoteLog.readRemoteLog(inBuffer);
+                remoteLog.setLogSource(LogSource.REMOTE);
+
+                switch (remoteLog.getLogType()) {
+                    case LogType.UPDATE:
+                    case LogType.ENTITY_COMMIT:
+                    case LogType.UPSERT_ENTITY_COMMIT:
+                        //if the log partition belongs to a partitions hosted on this node, replicate it
+                        if (nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
+                            logManager.log(remoteLog);
+                        }
+                        break;
+                    case LogType.JOB_COMMIT:
+                    case LogType.ABORT:
+                        LogRecord jobTerminationLog = new LogRecord();
+                        TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getJobId(),
+                                remoteLog.getLogType() == LogType.JOB_COMMIT);
+                        jobTerminationLog.setReplicationThread(this);
+                        jobTerminationLog.setLogSource(LogSource.REMOTE);
+                        logManager.log(jobTerminationLog);
+                        break;
+                    case LogType.FLUSH:
+                        //store mapping information for flush logs to use them in incoming LSM components.
+                        RemoteLogMapping flushLogMap = new RemoteLogMapping();
+                        flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
+                        flushLogMap.setRemoteLSN(remoteLog.getLSN());
+                        logManager.log(remoteLog);
+                        //the log LSN value is updated by logManager.log(.) to a local value
+                        flushLogMap.setLocalLSN(remoteLog.getLSN());
+                        flushLogMap.numOfFlushedIndexes.set(remoteLog.getNumOfFlushedIndexes());
+                        replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
+                        synchronized (flushLogslock) {
+                            flushLogslock.notify();
+                        }
+                        break;
+                    default:
+                        LOGGER.severe("Unsupported LogType: " + remoteLog.getLogType());
+                }
+            }
+        }
+
+        /**
+         * this method is called sequentially by LogPage (notifyReplicationTerminator)
+         * for JOB_COMMIT and JOB_ABORT log types.
+         */
         @Override
         public void notifyLogReplicationRequester(LogRecord logRecord) {
             pendingNotificationRemoteLogsQ.offer(logRecord);
@@ -480,24 +510,27 @@
                 try {
                     LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
                     //send ACK to requester
-                    try {
-                        logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
-                                .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
-                                        + System.lineSeparator()).getBytes());
-                    } catch (IOException e) {
-                        LOGGER.severe("Failed to send job replication ACK " + logRecord.getLogRecordForDisplay());
+                    logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream()
+                            .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId()
+                                    + System.lineSeparator()).getBytes());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Failed to send job replication ACK", e);
                     }
-                } catch (InterruptedException e1) {
-                    LOGGER.severe("ReplicationNotifier Thread interrupted.");
                 }
             }
         }
     }
 
     /**
-     * This thread is responsible for synchronizing the LSN of the received LSM components to a local LSN.
+     * This thread is responsible for synchronizing the LSN of
+     * the received LSM components to a local LSN.
      */
     private class LSMComponentsSyncService extends Thread {
+        private static final int BULKLOAD_LSN = 0;
+
         @Override
         public void run() {
             Thread.currentThread().setName("LSMComponentsSyncService Thread");
@@ -506,23 +539,24 @@
                 try {
                     LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
                     LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId());
-                    try {
-                        syncLSMComponentFlushLSN(lsmCompProp, syncTask);
-                        updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
+                    syncLSMComponentFlushLSN(lsmCompProp, syncTask);
+                    updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
+                } catch (Exception e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e);
+                    }
                 }
+
             }
         }
 
         private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
-                throws Exception {
+                throws InterruptedException, IOException {
             long remoteLSN = lsmCompProp.getOriginalLSN();
             //LSN=0 (bulkload) does not need to be updated and there is no flush log corresponding to it
-            if (remoteLSN == 0) {
+            if (remoteLSN == BULKLOAD_LSN) {
                 //since this is the first LSM component of this index,
                 //then set the mapping in the LSN_MAP to the current log LSN because
                 //no other log could've been received for this index since bulkload replication is synchronous.
@@ -536,16 +570,21 @@
                 if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
                     //need to look up LSN mapping from memory
                     RemoteLogMapping remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                    if (remoteLogMap == null) {
+                    //wait until flush log arrives, and verify the LSM component file still exists
+                    //The component file could be deleted if its NC fails.
+                    while (remoteLogMap == null && Files.exists(path)) {
                         synchronized (flushLogslock) {
-                            remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                            //wait until flush log arrives, and verify the LSM component file still exists
-                            //The component file could be deleted if its NC fails.
-                            while (remoteLogMap == null && Files.exists(path)) {
-                                flushLogslock.wait();
-                                remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
-                            }
+                            flushLogslock.wait();
                         }
+                        remoteLogMap = replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
+                    }
+
+                    /**
+                     * file has been deleted due to its remote primary replica failure
+                     * before its LSN could've been synchronized.
+                     */
+                    if (remoteLogMap == null) {
+                        return;
                     }
                     lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
                 } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
@@ -554,13 +593,14 @@
                             .getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(replicaResourcesManager));
                     Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN());
                     if (mappingLSN == null) {
-                        /*
-                         * this shouldn't happen unless this node just recovered and the first component it received
-                         * is a merged component due to an on-going merge operation while recovery on the remote replica.
-                         * In this case, we use the current append LSN since no new records exist for this index,
-                         * otherwise they would've been flushed.
-                         * This could be prevented by waiting for any IO to finish on the remote replica during recovery.
-                         *
+                        /**
+                         * this shouldn't happen unless this node just recovered and
+                         * the first component it received is a merged component due
+                         * to an on-going merge operation while recovery on the remote
+                         * replica. In this case, we use the current append LSN since
+                         * no new records exist for this index, otherwise they would've
+                         * been flushed. This could be prevented by waiting for any IO
+                         * to finish on the remote replica during recovery.
                          */
                         mappingLSN = logManager.getAppendLSN();
                     }
@@ -569,9 +609,10 @@
             }
 
             if (Files.notExists(path)) {
-                /*
-                 * This could happen when a merged component arrives and deletes the flushed
-                 * component (which we are trying to update) before its flush log arrives since logs and components are received
+                /**
+                 * This could happen when a merged component arrives and deletes
+                 * the flushed component (which we are trying to update) before
+                 * its flush log arrives since logs and components are received
                  * on different threads.
                  */
                 return;
@@ -594,4 +635,4 @@
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index ee872a5..5ba6ad2 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -71,7 +71,7 @@
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
-import org.apache.asterix.replication.logging.ReplicationLogFlusher;
+import org.apache.asterix.replication.logging.TxnLogReplicator;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
 import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
@@ -86,6 +86,8 @@
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.util.StorageUtil;
+import org.apache.hyracks.util.StorageUtil.StorageUnit;
 
 /**
  * This class is used to process replication jobs and maintain remote replicas states
@@ -93,7 +95,7 @@
 public class ReplicationManager implements IReplicationManager {
 
     private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName());
-    private final int INITIAL_REPLICATION_FACTOR = 1;
+    private static final int INITIAL_REPLICATION_FACTOR = 1;
     private final String nodeId;
     private ExecutorService replicationListenerThreads;
     private final Map<Integer, Set<String>> jobCommitAcks;
@@ -114,7 +116,7 @@
     private final AtomicBoolean replicationSuspended;
     private AtomicBoolean terminateJobsReplication;
     private AtomicBoolean jobsReplicationSuspended;
-    private final static int INITIAL_BUFFER_SIZE = 4000; //4KB
+    private static final int INITIAL_BUFFER_SIZE = StorageUtil.getSizeInBytes(4, StorageUnit.KILOBYTE);
     private final Set<String> shuttingDownReplicaIds;
     //replication threads
     private ReplicationJobsProccessor replicationJobsProcessor;
@@ -128,9 +130,10 @@
     private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
     private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
     protected ReplicationLogBuffer currentTxnLogBuffer;
-    private ReplicationLogFlusher txnlogsReplicator;
+    private TxnLogReplicator txnlogReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
-    private Map<String, SocketChannel> logsReplicaSockets = null;
+    private SocketChannel[] logsRepSockets;
+    private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
 
     //TODO this class needs to be refactored by moving its private classes to separate files
     //and possibly using MessageBroker to send/receive remote replicas events.
@@ -143,15 +146,15 @@
         this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider;
         this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(nodeId).substring(0, 3);
         this.logManager = logManager;
-        replicationJobsQ = new LinkedBlockingQueue<IReplicationJob>();
-        replicaEventsQ = new LinkedBlockingQueue<ReplicaEvent>();
+        replicationJobsQ = new LinkedBlockingQueue<>();
+        replicaEventsQ = new LinkedBlockingQueue<>();
         terminateJobsReplication = new AtomicBoolean(false);
         jobsReplicationSuspended = new AtomicBoolean(true);
         replicationSuspended = new AtomicBoolean(true);
-        replicas = new HashMap<String, Replica>();
-        jobCommitAcks = new ConcurrentHashMap<Integer, Set<String>>();
-        replicationJobsPendingAcks = new ConcurrentHashMap<Integer, ILogRecord>();
-        shuttingDownReplicaIds = new HashSet<String>();
+        replicas = new HashMap<>();
+        jobCommitAcks = new ConcurrentHashMap<>();
+        replicationJobsPendingAcks = new ConcurrentHashMap<>();
+        shuttingDownReplicaIds = new HashSet<>();
         dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
 
         //Used as async listeners from replicas
@@ -179,13 +182,14 @@
             clientPartitonsSet.addAll(clientPartitions);
             replica2PartitionsMap.put(replica.getId(), clientPartitonsSet);
         }
-        int numLogBuffers = logManager.getNumLogPages();
-        emptyLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
-        pendingFlushLogBuffersQ = new LinkedBlockingQueue<ReplicationLogBuffer>(numLogBuffers);
+        int numLogBuffers = replicationProperties.getLogBufferNumOfPages();
+        emptyLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
+        pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(numLogBuffers);
 
-        int logBufferSize = logManager.getLogPageSize();
+        int logBufferSize = replicationProperties.getLogBufferPageSize();
         for (int i = 0; i < numLogBuffers; i++) {
-            emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, logBufferSize));
+            emptyLogBuffersQ
+                    .offer(new ReplicationLogBuffer(this, logBufferSize, replicationProperties.getLogBatchSize()));
         }
     }
 
@@ -200,7 +204,7 @@
                     try {
                         replicationSuspended.wait();
                     } catch (InterruptedException e) {
-                        //ignore
+                        Thread.currentThread().interrupt();
                     }
                 }
             }
@@ -209,16 +213,12 @@
     }
 
     @Override
-    public void replicateLog(ILogRecord logRecord) {
+    public void replicateLog(ILogRecord logRecord) throws InterruptedException {
         if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
             //if replication is suspended, wait until it is resumed.
             while (replicationSuspended.get()) {
                 synchronized (replicationSuspended) {
-                    try {
-                        replicationSuspended.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
-                    }
+                    replicationSuspended.wait();
                 }
             }
             Set<String> replicaIds = Collections.synchronizedSet(new HashSet<String>());
@@ -232,29 +232,23 @@
     protected void getAndInitNewLargePage(int pageSize) {
         // for now, alloc a new buffer for each large page
         // TODO: consider pooling large pages
-        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize);
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
+        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, replicationProperties.getLogBufferPageSize());
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    protected void getAndInitNewPage() {
+    protected void getAndInitNewPage() throws InterruptedException {
         currentTxnLogBuffer = null;
         while (currentTxnLogBuffer == null) {
-            try {
-                currentTxnLogBuffer = emptyLogBuffersQ.take();
-            } catch (InterruptedException e) {
-                //ignore
-            }
+            currentTxnLogBuffer = emptyLogBuffersQ.take();
         }
         currentTxnLogBuffer.reset();
-        currentTxnLogBuffer.setReplicationSockets(logsReplicaSockets);
         pendingFlushLogBuffersQ.offer(currentTxnLogBuffer);
     }
 
-    private synchronized void appendToLogBuffer(ILogRecord logRecord) {
+    private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException {
         if (!currentTxnLogBuffer.hasSpace(logRecord)) {
             currentTxnLogBuffer.isFull(true);
-            if (logRecord.getLogSize() > logManager.getLogPageSize()) {
+            if (logRecord.getLogSize() > getLogPageSize()) {
                 getAndInitNewLargePage(logRecord.getLogSize());
             } else {
                 getAndInitNewPage();
@@ -326,7 +320,10 @@
                             long fileSize = fileChannel.size();
 
                             if (LSMComponentJob != null) {
-                                //since this is LSM_COMPONENT REPLICATE job, the job will contain only the component being replicated.
+                                /**
+                                 * since this is LSM_COMPONENT REPLICATE job, the job will contain
+                                 * only the component being replicated.
+                                 */
                                 ILSMComponent diskComponent = LSMComponentJob.getLSMIndexOperationContext()
                                         .getComponentsToBeReplicated().get(0);
                                 long LSNByteOffset = AsterixLSMIndexUtil.getComponentFileLSNOffset(
@@ -362,7 +359,7 @@
                                         }
                                     }
                                 } catch (IOException e) {
-                                    reportFailedReplica(entry.getKey());
+                                    handleReplicationFailure(socketChannel, e);
                                     iterator.remove();
                                 } finally {
                                     requestBuffer.position(0);
@@ -392,7 +389,7 @@
                                     waitForResponse(socketChannel, responseBuffer);
                                 }
                             } catch (IOException e) {
-                                reportFailedReplica(entry.getKey());
+                                handleReplicationFailure(socketChannel, e);
                                 iterator.remove();
                             } finally {
                                 requestBuffer.position(0);
@@ -458,7 +455,7 @@
     }
 
     /**
-     * Suspends proccessing replication jobs.
+     * Suspends processing replication jobs/logs.
      *
      * @param force
      *            a flag indicates if replication should be suspended right away or when the pending jobs are completed.
@@ -477,60 +474,134 @@
                     try {
                         jobsReplicationSuspended.wait();
                     } catch (InterruptedException e) {
-                        //ignore
+                        Thread.currentThread().interrupt();
                     }
                 }
             }
         }
 
         //suspend logs replication
-        if (txnlogsReplicator != null) {
-            terminateTxnLogsReplicator();
+        if (txnlogReplicator != null) {
+            endTxnLogReplicationHandshake();
         }
     }
 
     /**
      * Opens a new connection with Active remote replicas and starts a listen thread per connection.
      */
-    private void establishTxnLogsReplicationConnection() {
-        logsReplicaSockets = getActiveRemoteReplicasSockets();
+    private void establishTxnLogReplicationHandshake() {
+        Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets();
+        logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
+        int i = 0;
         //start a listener thread per connection
-        for (Entry<String, SocketChannel> entry : logsReplicaSockets.entrySet()) {
+        for (Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet()) {
+            logsRepSockets[i] = entry.getValue();
             replicationListenerThreads
                     .execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
+            i++;
+        }
+
+        /**
+         * establish log replication handshake
+         */
+        ByteBuffer handshakeBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE)
+                .putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+        handshakeBuffer.flip();
+        //send handshake request
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, handshakeBuffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                handshakeBuffer.position(0);
+            }
         }
     }
 
+    private void handleReplicationFailure(SocketChannel socketChannel, Throwable t) {
+        if (LOGGER.isLoggable(Level.WARNING)) {
+            LOGGER.log(Level.WARNING, "Could not complete replication request.", t);
+        }
+        if (socketChannel.isOpen()) {
+            try {
+                socketChannel.close();
+            } catch (IOException e) {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Could not close socket.", e);
+                }
+            }
+        }
+        reportFailedReplica(getReplicaIdBySocket(socketChannel));
+    }
+
     /**
-     * Stops ReplicationFlusherThread and closes the sockets used to replicate logs.
+     * Stops TxnLogReplicator and closes the sockets used to replicate logs.
      */
-    private void terminateTxnLogsReplicator() {
-        LOGGER.log(Level.INFO, "Terminating ReplicationLogFlusher thread ...");
-        txnlogsReplicator.terminate();
+    private void endTxnLogReplicationHandshake() {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Terminating TxnLogReplicator thread ...");
+        }
+        txnlogReplicator.terminate();
         try {
             txnLogReplicatorTask.get();
         } catch (ExecutionException | InterruptedException e) {
-            LOGGER.log(Level.WARNING, "RepicationLogFlusher thread terminated abnormally");
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, "TxnLogReplicator thread terminated abnormally", e);
+            }
         }
-        LOGGER.log(Level.INFO, "LogFlusher thread is terminated.");
 
-        if (logsReplicaSockets != null) {
-            //wait for any ACK to arrive before closing sockets.
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("TxnLogReplicator thread was terminated.");
+        }
+
+        /**
+         * End log replication handshake (by sending a dummy log with a single byte)
+         */
+        ByteBuffer endLogRepHandshake = ByteBuffer.allocate(Integer.SIZE + 1).putInt(1).put((byte) 0);
+        endLogRepHandshake.flip();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                NetworkingUtil.transferBufferToChannel(replicaSocket, endLogRepHandshake);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                endLogRepHandshake.position(0);
+            }
+        }
+
+        //wait for any ACK to arrive before closing sockets.
+        if (logsRepSockets != null) {
             synchronized (jobCommitAcks) {
-                while (jobCommitAcks.size() != 0) {
-                    try {
+                try {
+                    while (jobCommitAcks.size() != 0) {
                         jobCommitAcks.wait();
-                    } catch (InterruptedException e) {
-                        //ignore
                     }
+                } catch (InterruptedException e) {
+                    if (LOGGER.isLoggable(Level.SEVERE)) {
+                        LOGGER.log(Level.SEVERE, "Interrupted while waiting for jobs ACK", e);
+                    }
+                    Thread.currentThread().interrupt();
                 }
             }
-
-            //close log replication sockets
-            closeReplicaSockets(logsReplicaSockets);
-            logsReplicaSockets = null;
         }
+
+        /**
+         * Close log replication sockets
+         */
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send goodbye to remote replica
+                NetworkingUtil.transferBufferToChannel(replicaSocket, goodbyeBuffer);
+                replicaSocket.close();
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                goodbyeBuffer.position(0);
+            }
+        }
+        logsRepSockets = null;
     }
 
     /**
@@ -567,14 +638,7 @@
             try {
                 NetworkingUtil.transferBufferToChannel(clientSocket, requestBuffer);
             } catch (IOException e) {
-                if (clientSocket.isOpen()) {
-                    try {
-                        clientSocket.close();
-                    } catch (IOException e2) {
-                        e2.printStackTrace();
-                    }
-                }
-                reportFailedReplica(replicaSocket.getKey());
+                handleReplicationFailure(clientSocket, e);
                 iterator.remove();
             } finally {
                 requestBuffer.position(0);
@@ -600,7 +664,7 @@
                 try {
                     clientSocket.close();
                 } catch (IOException e) {
-                    e.printStackTrace();
+                    handleReplicationFailure(clientSocket, e);
                 }
             }
         }
@@ -636,7 +700,7 @@
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {
-                    e.printStackTrace();
+                    Thread.currentThread().interrupt();
                 }
             }
         }
@@ -651,8 +715,10 @@
      *            The new state of the replica.
      * @param suspendReplication
      *            a flag indicating whether to suspend replication on state change or not.
+     * @throws InterruptedException
      */
-    public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication) {
+    public synchronized void updateReplicaState(String replicaId, ReplicaState newState, boolean suspendReplication)
+            throws InterruptedException {
         Replica replica = replicas.get(replicaId);
 
         if (replica.getState() == newState) {
@@ -680,10 +746,8 @@
 
         if (newState == ReplicaState.ACTIVE) {
             replicationFactor++;
-        } else if (newState == ReplicaState.DEAD) {
-            if (replicationFactor > INITIAL_REPLICATION_FACTOR) {
-                replicationFactor--;
-            }
+        } else if (newState == ReplicaState.DEAD && replicationFactor > INITIAL_REPLICATION_FACTOR) {
+            replicationFactor--;
         }
 
         LOGGER.log(Level.WARNING, "Replica " + replicaId + " state changed to: " + newState.name()
@@ -702,22 +766,24 @@
      *            The remote replica id the ACK received from.
      */
     private void addAckToJob(int jobId, String replicaId) {
-        //add ACK to the job
-        if (jobCommitAcks.containsKey(jobId)) {
-            Set<String> replicaIds = jobCommitAcks.get(jobId);
-            replicaIds.add(replicaId);
-        } else {
-            throw new IllegalStateException("Job ID not found in pending job commits  " + jobId);
-        }
+        synchronized (jobCommitAcks) {
+            //add ACK to the job
+            if (jobCommitAcks.containsKey(jobId)) {
+                Set<String> replicaIds = jobCommitAcks.get(jobId);
+                replicaIds.add(replicaId);
+            } else {
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.warning("Invalid job replication ACK received for jobId(" + jobId + ")");
+                }
+                return;
+            }
 
-        //if got ACKs from all remote replicas, notify pending jobs if any
-        if (jobCommitAcks.get(jobId).size() == replicationFactor) {
-            synchronized (replicationJobsPendingAcks) {
-                if (replicationJobsPendingAcks.containsKey(jobId)) {
-                    ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
-                    synchronized (pendingLog) {
-                        pendingLog.notify();
-                    }
+            //if got ACKs from all remote replicas, notify pending jobs if any
+
+            if (jobCommitAcks.get(jobId).size() == replicationFactor && replicationJobsPendingAcks.containsKey(jobId)) {
+                ILogRecord pendingLog = replicationJobsPendingAcks.get(jobId);
+                synchronized (pendingLog) {
+                    pendingLog.notify();
                 }
             }
         }
@@ -725,26 +791,25 @@
 
     @Override
     public boolean hasBeenReplicated(ILogRecord logRecord) {
-        if (jobCommitAcks.containsKey(logRecord.getJobId())) {
-            //check if all ACKs have been received
-            if (jobCommitAcks.get(logRecord.getJobId()).size() == replicationFactor) {
-                jobCommitAcks.remove(logRecord.getJobId());
+        int jobId = logRecord.getJobId();
+        if (jobCommitAcks.containsKey(jobId)) {
+            synchronized (jobCommitAcks) {
+                //check if all ACKs have been received
+                if (jobCommitAcks.get(jobId).size() == replicationFactor) {
+                    jobCommitAcks.remove(jobId);
 
-                if (replicationJobsPendingAcks.containsKey(logRecord.getJobId())) {
-                    replicationJobsPendingAcks.remove(logRecord);
-                }
+                    //remove from pending jobs if exists
+                    replicationJobsPendingAcks.remove(jobId);
 
-                //notify any threads waiting for all jobs to finish
-                if (jobCommitAcks.size() == 0) {
-                    synchronized (jobCommitAcks) {
+                    //notify any threads waiting for all jobs to finish
+                    if (jobCommitAcks.size() == 0) {
                         jobCommitAcks.notifyAll();
                     }
+                    return true;
+                } else {
+                    replicationJobsPendingAcks.putIfAbsent(jobId, logRecord);
+                    return false;
                 }
-
-                return true;
-            } else {
-                replicationJobsPendingAcks.putIfAbsent(logRecord.getJobId(), logRecord);
-                return false;
             }
         }
 
@@ -753,13 +818,16 @@
     }
 
     private Map<String, SocketChannel> getActiveRemoteReplicasSockets() {
-        Map<String, SocketChannel> replicaNodesSockets = new HashMap<String, SocketChannel>();
+        Map<String, SocketChannel> replicaNodesSockets = new HashMap<>();
         for (Replica replica : replicas.values()) {
             if (replica.getState() == ReplicaState.ACTIVE) {
                 try {
                     SocketChannel sc = getReplicaSocket(replica.getId());
                     replicaNodesSockets.put(replica.getId(), sc);
                 } catch (IOException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Could not get replica socket", e);
+                    }
                     reportFailedReplica(replica.getId());
                 }
             }
@@ -776,7 +844,7 @@
      * @throws IOException
      */
     private SocketChannel getReplicaSocket(String replicaId) throws IOException {
-        Replica replica = replicas.get(replicaId);
+        Replica replica = replicationProperties.getReplicaById(replicaId);
         SocketChannel sc = SocketChannel.open();
         sc.configureBlocking(true);
         InetSocketAddress address = replica.getAddress(replicationProperties);
@@ -786,7 +854,7 @@
 
     @Override
     public Set<String> getDeadReplicasIds() {
-        Set<String> replicasIds = new HashSet<String>();
+        Set<String> replicasIds = new HashSet<>();
         for (Replica replica : replicas.values()) {
             if (replica.getState() == ReplicaState.DEAD) {
                 replicasIds.add(replica.getNode().getId());
@@ -797,7 +865,7 @@
 
     @Override
     public Set<String> getActiveReplicasIds() {
-        Set<String> replicasIds = new HashSet<String>();
+        Set<String> replicasIds = new HashSet<>();
         for (Replica replica : replicas.values()) {
             if (replica.getState() == ReplicaState.ACTIVE) {
                 replicasIds.add(replica.getNode().getId());
@@ -823,40 +891,35 @@
 
     /**
      * Called during NC shutdown to notify remote replicas about the shutdown
-     * and wait for remote replicas shutdown notification then closes the local replication channel.
+     * and wait for remote replicas shutdown notification then closes the local
+     * replication channel.
      */
     @Override
     public void stop(boolean dumpState, OutputStream ouputStream) throws IOException {
-        try {
-            //stop replication thread afters all jobs/logs have been processed
-            suspendReplication(false);
-            //send shutdown event to remote replicas
-            sendShutdownNotifiction();
-            //wait until all shutdown events come from all remote replicas
-            synchronized (shuttingDownReplicaIds) {
-                while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
-                    try {
-                        shuttingDownReplicaIds.wait(1000);
-                    } catch (InterruptedException e) {
-                        e.printStackTrace();
-                    }
+        //stop replication thread afters all jobs/logs have been processed
+        suspendReplication(false);
+        //send shutdown event to remote replicas
+        sendShutdownNotifiction();
+        //wait until all shutdown events come from all remote replicas
+        synchronized (shuttingDownReplicaIds) {
+            while (!shuttingDownReplicaIds.containsAll(getActiveReplicasIds())) {
+                try {
+                    shuttingDownReplicaIds.wait(1000);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
             }
-            LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
-            //close replication channel
-            asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
-
-            LOGGER.log(Level.INFO, "Replication manager stopped.");
-        } catch (Exception e) {
-            e.printStackTrace();
         }
+        LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
+        //close replication channel
+        asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
+
+        LOGGER.log(Level.INFO, "Replication manager stopped.");
     }
 
     @Override
     public void reportReplicaEvent(ReplicaEvent event) {
-        synchronized (replicaEventsQ) {
-            replicaEventsQ.offer(event);
-        }
+        replicaEventsQ.offer(event);
     }
 
     /**
@@ -867,6 +930,9 @@
      */
     public void reportFailedReplica(String replicaId) {
         Replica replica = replicas.get(replicaId);
+        if (replica == null) {
+            return;
+        }
         if (replica.getState() == ReplicaState.DEAD) {
             return;
         }
@@ -878,16 +944,28 @@
         reportReplicaEvent(event);
     }
 
+    private String getReplicaIdBySocket(SocketChannel socketChannel) {
+        InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel);
+        for (Replica replica : replicas.values()) {
+            InetSocketAddress replicaAddress = replica.getAddress(replicationProperties);
+            if (replicaAddress.getHostName().equals(socketAddress.getHostName())
+                    && replicaAddress.getPort() == socketAddress.getPort()) {
+                return replica.getId();
+            }
+        }
+        return null;
+    }
+
     @Override
-    public void startReplicationThreads() {
+    public void startReplicationThreads() throws InterruptedException {
         replicationJobsProcessor = new ReplicationJobsProccessor();
 
         //start/continue processing jobs/logs
-        if (logsReplicaSockets == null) {
-            establishTxnLogsReplicationConnection();
+        if (logsRepSockets == null) {
+            establishTxnLogReplicationHandshake();
             getAndInitNewPage();
-            txnlogsReplicator = new ReplicationLogFlusher(emptyLogBuffersQ, pendingFlushLogBuffersQ);
-            txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogsReplicator);
+            txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ);
+            txnLogReplicatorTask = asterixAppRuntimeContextProvider.getThreadExecutor().submit(txnlogReplicator);
         }
 
         replicationJobsProcessor.start();
@@ -936,7 +1014,11 @@
                     ReplicationProtocol.sendGoodbye(socketChannel);
                 }
 
-                //4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend.
+                /**
+                 * 4. update the LSN_MAP for indexes that were not flushed
+                 * to the current append LSN to indicate no operations happened
+                 * since the checkpoint start.
+                 */
                 if (laggingIndexesResponse != null) {
                     for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
                         String indexPath = laggingIndexes.get(resouceId);
@@ -955,7 +1037,7 @@
         long maxRemoteLSN = 0;
 
         ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
-        Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
+        Map<String, SocketChannel> replicaSockets = new HashMap<>();
         try {
             for (String replicaId : remoteReplicas) {
                 replicaSockets.put(replicaId, getReplicaSocket(replicaId));
@@ -1037,7 +1119,42 @@
     }
 
     public int getLogPageSize() {
-        return logManager.getLogPageSize();
+        return replicationProperties.getLogBufferPageSize();
+    }
+
+    @Override
+    public void replicateTxnLogBatch(final ByteBuffer buffer) {
+        //if replication is suspended, wait until it is resumed
+        try {
+            while (replicationSuspended.get()) {
+                synchronized (replicationSuspended) {
+                    replicationSuspended.wait();
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        //prepare the batch size buffer
+        txnLogsBatchSizeBuffer.clear();
+        txnLogsBatchSizeBuffer.putInt(buffer.remaining());
+        txnLogsBatchSizeBuffer.flip();
+
+        buffer.mark();
+        for (SocketChannel replicaSocket : logsRepSockets) {
+            try {
+                //send batch size
+                NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
+                //send log
+                NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
+            } finally {
+                txnLogsBatchSizeBuffer.position(0);
+                buffer.reset();
+            }
+        }
+        //move the buffer position to the sent limit
+        buffer.position(buffer.limit());
     }
 
     //supporting classes
@@ -1068,12 +1185,12 @@
                             break;
                     }
                 } catch (InterruptedException e) {
-                    //ignore
+                    Thread.currentThread().interrupt();
                 }
             }
         }
 
-        public void handleReplicaFailure(String replicaId) {
+        public void handleReplicaFailure(String replicaId) throws InterruptedException {
             Replica replica = replicas.get(replicaId);
 
             if (replica.getState() == ReplicaState.DEAD) {
@@ -1127,12 +1244,16 @@
                     processJob(job, replicaSockets, reusableBuffer);
 
                     //if no more jobs to process, close sockets
-                    if (replicationJobsQ.size() == 0) {
+                    if (replicationJobsQ.isEmpty()) {
                         LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
                         closeSockets();
                     }
-                } catch (Exception e) {
-                    e.printStackTrace();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                } catch (IOException e) {
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.log(Level.WARNING, "Couldn't complete processing replication job", e);
+                    }
                 }
             }
 
@@ -1169,25 +1290,25 @@
             Thread.currentThread().setName("TxnLogs Replication Listener Thread");
             LOGGER.log(Level.INFO, "Started listening on socket: " + replicaSocket.socket().getRemoteSocketAddress());
 
-            try {
-                BufferedReader incomingResponse = new BufferedReader(
-                        new InputStreamReader(replicaSocket.socket().getInputStream()));
-                String responseLine = "";
+            try (BufferedReader incomingResponse = new BufferedReader(
+                    new InputStreamReader(replicaSocket.socket().getInputStream()))) {
                 while (true) {
-                    responseLine = incomingResponse.readLine();
+                    String responseLine = incomingResponse.readLine();
                     if (responseLine == null) {
                         break;
                     }
                     //read ACK for job commit log
-                    String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+                    String ackFrom = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
                     int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
-                    addAckToJob(jobId, replicaId);
+                    addAckToJob(jobId, ackFrom);
                 }
-            } catch (AsynchronousCloseException e1) {
-                LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId);
-            } catch (IOException e2) {
-                reportFailedReplica(replicaId);
+            } catch (AsynchronousCloseException e) {
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + replicaId, e);
+                }
+            } catch (IOException e) {
+                handleReplicationFailure(replicaSocket, e);
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 47e60b2..4da5fd4 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -63,8 +63,8 @@
 
         Set<String> nodes = replicationProperties.getNodeReplicationClients(localNodeId);
 
-        Map<String, Set<String>> recoveryCandidates = new HashMap<String, Set<String>>();
-        Map<String, Integer> candidatesScore = new HashMap<String, Integer>();
+        Map<String, Set<String>> recoveryCandidates = new HashMap<>();
+        Map<String, Integer> candidatesScore = new HashMap<>();
 
         //2. identify which nodes has backup per lost node data
         for (String node : nodes) {
@@ -80,7 +80,7 @@
             }
 
             //no active replicas to recover from
-            if (locations.size() == 0) {
+            if (locations.isEmpty()) {
                 throw new IllegalStateException("Could not find any ACTIVE replica to recover " + node + " data.");
             }
 
@@ -94,7 +94,7 @@
             recoveryCandidates.put(node, locations);
         }
 
-        Map<String, Set<String>> recoveryList = new HashMap<String, Set<String>>();
+        Map<String, Set<String>> recoveryList = new HashMap<>();
 
         //3. find best candidate to recover from per lost replica data
         for (Entry<String, Set<String>> entry : recoveryCandidates.entrySet()) {
@@ -113,7 +113,7 @@
             if (recoveryList.containsKey(winner)) {
                 recoveryList.get(winner).add(entry.getKey());
             } else {
-                Set<String> nodesToRecover = new HashSet<String>();
+                Set<String> nodesToRecover = new HashSet<>();
                 nodesToRecover.add(entry.getKey());
                 recoveryList.put(winner, nodesToRecover);
             }
@@ -196,15 +196,16 @@
                 }
                 break;
             } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...");
+                if (LOGGER.isLoggable(Level.WARNING)) {
+                    LOGGER.log(Level.WARNING, "Failed during remote recovery. Attempting again...", e);
+                }
                 maxRecoveryAttempts--;
             }
         }
     }
 
     @Override
-    public void completeFailbackProcess() throws IOException {
+    public void completeFailbackProcess() throws IOException, InterruptedException {
         ILogManager logManager = runtimeContext.getTransactionSubsystem().getLogManager();
         ReplicaResourcesManager replicaResourcesManager = (ReplicaResourcesManager) runtimeContext
                 .getReplicaResourcesManager();
@@ -237,7 +238,9 @@
              * in case of failure during failback completion process we need to construct a new plan
              * and get all the files from the start since the remote replicas will change in the new plan.
              */
-            e.printStackTrace();
+            if (LOGGER.isLoggable(Level.WARNING)) {
+                LOGGER.log(Level.WARNING, "Failed during completing failback. Restarting failback process...", e);
+            }
             startFailbackProcess();
         }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 872adcd..1245674 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -76,9 +76,9 @@
         appendOffset = 0;
         flushOffset = 0;
         isLastPage = false;
-        syncCommitQ = new LinkedBlockingQueue<ILogRecord>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
-        flushQ = new LinkedBlockingQueue<ILogRecord>();
-        remoteJobsQ = new LinkedBlockingQueue<ILogRecord>();
+        syncCommitQ = new LinkedBlockingQueue<>(logPageSize / ILogRecord.JOB_TERMINATE_LOG_SIZE);
+        flushQ = new LinkedBlockingQueue<>();
+        remoteJobsQ = new LinkedBlockingQueue<>();
         reusableDsId = new DatasetId(-1);
         reusableJobId = new JobId(-1);
     }
@@ -113,7 +113,7 @@
 
     @Override
     public void appendWithReplication(ILogRecord logRecord, long appendLSN) {
-        logRecord.writeLogRecord(appendBuffer, appendLSN);
+        logRecord.writeLogRecord(appendBuffer);
 
         if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
                 && logRecord.getLogType() != LogType.WAIT) {
@@ -135,10 +135,9 @@
                     logRecord.isFlushed(false);
                     flushQ.offer(logRecord);
                 }
-            } else if (logRecord.getLogSource() == LogSource.REMOTE) {
-                if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
-                    remoteJobsQ.offer(logRecord);
-                }
+            } else if (logRecord.getLogSource() == LogSource.REMOTE
+                    && (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT)) {
+                remoteJobsQ.offer(logRecord);
             }
             this.notify();
         }
@@ -347,11 +346,7 @@
         IReplicationThread replicationThread = logRecord.getReplicationThread();
 
         if (replicationThread != null) {
-            try {
-                replicationThread.notifyLogReplicationRequester(logRecord);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
+            replicationThread.notifyLogReplicationRequester(logRecord);
         }
     }
 
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index cacd036..0c4cb88 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -37,10 +37,6 @@
 
     @Override
     public void log(ILogRecord logRecord) throws ACIDException {
-        if (logRecord.getLogSize() > logPageSize) {
-            throw new IllegalStateException();
-        }
-
         //only locally generated logs should be replicated
         logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT);
 
@@ -58,7 +54,11 @@
         syncAppendToLogTail(logRecord);
 
         if (logRecord.isReplicated()) {
-            replicationManager.replicateLog(logRecord);
+            try {
+                replicationManager.replicateLog(logRecord);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
         }
 
         if (logRecord.getLogSource() == LogSource.LOCAL) {
@@ -69,7 +69,7 @@
                         try {
                             logRecord.wait();
                         } catch (InterruptedException e) {
-                            //ignore
+                            Thread.currentThread().interrupt();
                         }
                     }
 
@@ -79,7 +79,7 @@
                             try {
                                 logRecord.wait();
                             } catch (InterruptedException e) {
-                                //ignore
+                                Thread.currentThread().interrupt();
                             }
                         }
                     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Txn Log Replication Optimizations

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.

Change subject: Txn Log Replication Optimizations
......................................................................


Patch Set 4: Code-Review+2

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/883
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I25b5b84eba0cd41ac8e87e71368072879fcf8582
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No