You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2018/01/05 16:10:34 UTC

[06/10] asterixdb git commit: [ASTERIXDB-2195][REPL] Replace Static Replication

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java
new file mode 100644
index 0000000..23e6f3c
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnAckTracker.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.logging;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class TxnAckTracker {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final Map<Long, TxnAck> txnsAcks = new HashMap<>();
+
+    public synchronized void track(ILogRecord logRecord, Set<IReplicationDestination> replicas) {
+        if (replicas.isEmpty()) {
+            logRecord.setReplicated(true);
+            return;
+        }
+        final long txnId = logRecord.getTxnId();
+        //TODO use LSN instead of txnId when all logs have LSN
+        txnsAcks.put(txnId, new TxnAck(logRecord, replicas));
+    }
+
+    public synchronized void ack(long txnId, IReplicationDestination replica) {
+        if (!txnsAcks.containsKey(txnId)) {
+            LOGGER.warn("Received ack for unknown txn {}", txnId);
+            return;
+        }
+        TxnAck txnAcks = txnsAcks.get(txnId);
+        txnAcks.ack(replica);
+        if (txnAcks.allAcked()) {
+            txnsAcks.remove(txnId);
+        }
+    }
+
+    public synchronized void unregister(IReplicationDestination replica) {
+        // assume the ack was received from leaving replicas
+        final HashSet<Long> pendingTxn = new HashSet<>(txnsAcks.keySet());
+        pendingTxn.forEach(txnId -> ack(txnId, replica));
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java
deleted file mode 100644
index f51a64d..0000000
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/TxnLogUtil.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.logging;
-
-public class TxnLogUtil {
-
-    private TxnLogUtil() {
-        //prevent util class construction
-    }
-
-    /**
-     * @param nodeId
-     * @param LSN
-     * @return Concatenation of nodeId and LSN
-     */
-    public static String getNodeUniqueLSN(String nodeId, long LSN) {
-        return nodeId + LSN;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
new file mode 100644
index 0000000..b6f752f
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/IndexReplicationManager.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.management;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.asterix.replication.api.PartitionReplica;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
+import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.replication.IReplicationJob;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class IndexReplicationManager {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final IReplicationManager replicationManager;
+    private final Set<ReplicationDestination> destinations = new HashSet<>();
+    private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>();
+    private final IReplicationStrategy replicationStrategy;
+    private final PersistentLocalResourceRepository resourceRepository;
+    private final INcApplicationContext appCtx;
+    private final Object transferLock = new Object();
+    private final Set<ReplicationDestination> failedDest = new HashSet<>();
+
+    public IndexReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
+        this.appCtx = appCtx;
+        this.replicationManager = replicationManager;
+        this.resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+        replicationStrategy = replicationManager.getReplicationStrategy();
+        appCtx.getThreadExecutor().execute(new ReplicationJobsProcessor());
+    }
+
+    public void register(ReplicationDestination dest) {
+        synchronized (transferLock) {
+            LOGGER.info(() -> "register " + dest);
+            destinations.add(dest);
+            failedDest.remove(dest);
+        }
+    }
+
+    public void unregister(IReplicationDestination dest) {
+        synchronized (transferLock) {
+            LOGGER.info(() -> "unregister " + dest);
+            destinations.remove(dest);
+            failedDest.remove(dest);
+        }
+    }
+
+    private void handleFailure(ReplicationDestination dest, Exception e) {
+        synchronized (transferLock) {
+            if (failedDest.contains(dest)) {
+                return;
+            }
+            LOGGER.error("Replica failed", e);
+            if (destinations.contains(dest)) {
+                failedDest.add(dest);
+            }
+            replicationManager.notifyFailure(dest, e);
+        }
+    }
+
+    public void accept(IReplicationJob job) {
+        if (job.getExecutionType() == IReplicationJob.ReplicationExecutionType.ASYNC) {
+            replicationJobsQ.add(job);
+            return;
+        }
+        process(job);
+    }
+
+    private void process(IReplicationJob job) {
+        try {
+            if (skip(job)) {
+                return;
+            }
+            synchronized (transferLock) {
+                if (destinations.isEmpty()) {
+                    return;
+                }
+                final IndexSynchronizer synchronizer = new IndexSynchronizer(job, appCtx);
+                final int indexPartition = getJobPartition(job);
+                for (ReplicationDestination dest : destinations) {
+                    try {
+                        Optional<IPartitionReplica> partitionReplica = dest.getPartitionReplica(indexPartition);
+                        if (!partitionReplica.isPresent()) {
+                            continue;
+                        }
+                        PartitionReplica replica = (PartitionReplica) partitionReplica.get();
+                        synchronizer.sync(replica);
+                    } catch (Exception e) {
+                        handleFailure(dest, e);
+                    }
+                }
+                closeChannels();
+            }
+        } finally {
+            afterReplication(job);
+        }
+    }
+
+    private boolean skip(IReplicationJob job) {
+        try {
+            final DatasetResourceReference indexFileRef =
+                    resourceRepository.getLocalResourceReference(job.getAnyFile());
+            return !replicationStrategy.isMatch(indexFileRef.getDatasetId());
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException("Couldn't find resource for " + job.getAnyFile(), e);
+        }
+    }
+
+    private int getJobPartition(IReplicationJob job) {
+        return ResourceReference.of(job.getAnyFile()).getPartitionNum();
+    }
+
+    private void closeChannels() {
+        if (!replicationJobsQ.isEmpty()) {
+            return;
+        }
+        LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
+        for (ReplicationDestination dest : destinations) {
+            dest.getReplicas().stream().map(PartitionReplica.class::cast).forEach(PartitionReplica::close);
+        }
+    }
+
+    private static void afterReplication(IReplicationJob job) {
+        try {
+            if (job.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE
+                    && job instanceof ILSMIndexReplicationJob) {
+                ((ILSMIndexReplicationJob) job).endReplication();
+            }
+        } catch (HyracksDataException e) {
+            throw new ReplicationException(e);
+        }
+    }
+
+    private class ReplicationJobsProcessor implements Runnable {
+
+        @Override
+        public void run() {
+            Thread.currentThread().setName(ReplicationJobsProcessor.class.getSimpleName());
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    final IReplicationJob job = replicationJobsQ.take();
+                    process(job);
+                } catch (InterruptedException e) {
+                    LOGGER.warn(() -> ReplicationJobsProcessor.class.getSimpleName() + " interrupted.", e);
+                    Thread.currentThread().interrupt();
+                }
+            }
+            LOGGER.warn("{} stopped.", ReplicationJobsProcessor.class.getSimpleName());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
new file mode 100644
index 0000000..b28c2f7
--- /dev/null
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/LogReplicationManager.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.management;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.ReplicationProperties;
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IReplicationDestination;
+import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.replication.api.ReplicationDestination;
+import org.apache.asterix.replication.logging.ReplicationLogBuffer;
+import org.apache.asterix.replication.logging.TxnAckTracker;
+import org.apache.asterix.replication.logging.TxnLogReplicator;
+import org.apache.asterix.replication.messaging.ReplicateLogsTask;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class LogReplicationManager {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
+    private final LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
+    private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(Integer.BYTES);
+    private final Map<ReplicationDestination, SocketChannel> destinations = new HashMap<>();
+    private final IReplicationManager replicationManager;
+    private final Executor executor;
+    private final TxnAckTracker ackTracker = new TxnAckTracker();
+    private final Set<SocketChannel> failedSockets = new HashSet<>();
+    private final Object transferLock = new Object();
+    private final INcApplicationContext appCtx;
+    private final int logPageSize;
+    private final int logBatchSize;
+    private ReplicationLogBuffer currentTxnLogBuffer;
+    private SocketChannel[] destSockets;
+
+    public LogReplicationManager(INcApplicationContext appCtx, IReplicationManager replicationManager) {
+        this.appCtx = appCtx;
+        this.replicationManager = replicationManager;
+        final ReplicationProperties replicationProperties = appCtx.getReplicationProperties();
+        logPageSize = replicationProperties.getLogBufferPageSize();
+        logBatchSize = replicationProperties.getLogBatchSize();
+        executor = appCtx.getThreadExecutor();
+        emptyLogBuffersQ = new LinkedBlockingQueue<>();
+        pendingFlushLogBuffersQ = new LinkedBlockingQueue<>();
+        initBuffers(replicationProperties.getLogBufferNumOfPages());
+        TxnLogReplicator txnlogReplicator = new TxnLogReplicator(emptyLogBuffersQ, pendingFlushLogBuffersQ);
+        ((ExecutorService) executor).submit(txnlogReplicator);
+    }
+
+    private void initBuffers(int buffers) {
+        for (int i = 0; i < buffers; i++) {
+            emptyLogBuffersQ.add(new ReplicationLogBuffer(this, logPageSize, logBatchSize));
+        }
+        try {
+            getAndInitNewPage();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new ReplicationException(e);
+        }
+    }
+
+    public void register(ReplicationDestination dest) {
+        synchronized (transferLock) {
+            synchronized (destinations) {
+                if (destinations.containsKey(dest)) {
+                    return;
+                }
+                LOGGER.info(() -> "register " + dest);
+                SocketChannel socketChannel = dest.getLogReplicationChannel();
+                handshake(dest, socketChannel);
+                destinations.put(dest, socketChannel);
+                failedSockets.remove(socketChannel);
+                destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]);
+            }
+        }
+    }
+
+    public void unregister(IReplicationDestination dest) {
+        synchronized (transferLock) {
+            synchronized (destinations) {
+                if (!destinations.containsKey(dest)) {
+                    return;
+                }
+                LOGGER.info(() -> "unregister " + dest);
+                ackTracker.unregister(dest);
+                SocketChannel destSocket = destinations.remove(dest);
+                failedSockets.remove(destSocket);
+                destSockets = destinations.values().toArray(new SocketChannel[destinations.size()]);
+                endReplication(destSocket);
+            }
+        }
+    }
+
+    public void replicate(ILogRecord logRecord) throws InterruptedException {
+        if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT) {
+            synchronized (destinations) {
+                ackTracker.track(logRecord, new HashSet<>(destinations.keySet()));
+            }
+        }
+        appendToLogBuffer(logRecord);
+    }
+
+    public void transferBatch(final ByteBuffer buffer) {
+        // prepare the batch size buffer
+        txnLogsBatchSizeBuffer.clear();
+        txnLogsBatchSizeBuffer.putInt(buffer.remaining());
+        txnLogsBatchSizeBuffer.flip();
+
+        buffer.mark();
+        synchronized (transferLock) {
+            if (destSockets != null) {
+                for (SocketChannel replicaSocket : destSockets) {
+                    try {
+                        // send batch size then the batch itself
+                        NetworkingUtil.transferBufferToChannel(replicaSocket, txnLogsBatchSizeBuffer);
+                        NetworkingUtil.transferBufferToChannel(replicaSocket, buffer);
+                    } catch (IOException e) {
+                        handleFailure(replicaSocket, e);
+                    } finally {
+                        txnLogsBatchSizeBuffer.position(0);
+                        buffer.reset();
+                    }
+                }
+            }
+        }
+        // move the buffer position to the sent limit
+        buffer.position(buffer.limit());
+    }
+
+    public int getLogPageSize() {
+        return logPageSize;
+    }
+
+    private synchronized void appendToLogBuffer(ILogRecord logRecord) throws InterruptedException {
+        if (!currentTxnLogBuffer.hasSpace(logRecord)) {
+            currentTxnLogBuffer.isFull(true);
+            if (logRecord.getLogSize() > logPageSize) {
+                getAndInitNewLargePage(logRecord.getLogSize());
+            } else {
+                getAndInitNewPage();
+            }
+        }
+        currentTxnLogBuffer.append(logRecord);
+    }
+
+    private void getAndInitNewPage() throws InterruptedException {
+        currentTxnLogBuffer = null;
+        while (currentTxnLogBuffer == null) {
+            currentTxnLogBuffer = emptyLogBuffersQ.take();
+        }
+        currentTxnLogBuffer.reset();
+        pendingFlushLogBuffersQ.add(currentTxnLogBuffer);
+    }
+
+    private void getAndInitNewLargePage(int pageSize) {
+        // for now, alloc a new buffer for each large page
+        currentTxnLogBuffer = new ReplicationLogBuffer(this, pageSize, logBatchSize);
+        pendingFlushLogBuffersQ.add(currentTxnLogBuffer);
+    }
+
+    private void handshake(ReplicationDestination dest, SocketChannel socketChannel) {
+        final String nodeId = appCtx.getServiceContext().getNodeId();
+        final ReplicateLogsTask task = new ReplicateLogsTask(nodeId);
+        ReplicationProtocol.sendTo(socketChannel, task, null);
+        executor.execute(new TxnAckListener(dest, socketChannel));
+    }
+
+    private void endReplication(SocketChannel socketChannel) {
+        if (socketChannel.isConnected()) {
+            // end log replication (by sending a dummy log with a single byte)
+            final ByteBuffer endLogRepBuffer = ReplicationProtocol.getEndLogReplicationBuffer();
+            try {
+                NetworkingUtil.transferBufferToChannel(socketChannel, endLogRepBuffer);
+            } catch (IOException e) {
+                LOGGER.warn("Failed to end txn log", e);
+            }
+        }
+    }
+
+    private synchronized void handleFailure(SocketChannel replicaSocket, IOException e) {
+        if (failedSockets.contains(replicaSocket)) {
+            return;
+        }
+        LOGGER.error("Replica failed", e);
+        failedSockets.add(replicaSocket);
+        Optional<ReplicationDestination> socketDest =
+                destinations.entrySet().stream().filter(entry -> entry.getValue().equals(replicaSocket))
+                        .map(Map.Entry::getKey).findFirst();
+        socketDest.ifPresent(dest -> replicationManager.notifyFailure(dest, e));
+    }
+
+    private class TxnAckListener implements Runnable {
+        private final ReplicationDestination dest;
+        private final SocketChannel replicaSocket;
+
+        TxnAckListener(ReplicationDestination dest, SocketChannel replicaSocket) {
+            this.dest = dest;
+            this.replicaSocket = replicaSocket;
+        }
+
+        @Override
+        public void run() {
+            Thread.currentThread().setName("TxnAckListener (" + dest + ")");
+            LOGGER.info("Started listening on socket: {}", dest);
+            try (BufferedReader incomingResponse = new BufferedReader(
+                    new InputStreamReader(replicaSocket.socket().getInputStream()))) {
+                while (true) {
+                    final String response = incomingResponse.readLine();
+                    if (response == null) {
+                        handleFailure(replicaSocket, new IOException("Unexpected response from replica " + dest));
+                        break;
+                    }
+                    // read ACK
+                    final int txnId = ReplicationProtocol.getTxnIdFromLogAckMessage(response);
+                    ackTracker.ack(txnId, dest);
+                }
+            } catch (AsynchronousCloseException e) {
+                LOGGER.debug(() -> "Stopped listening on socket:" + dest, e);
+            } catch (IOException e) {
+                handleFailure(replicaSocket, e);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
deleted file mode 100644
index e260de5..0000000
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.management;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.concurrent.Callable;
-
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.Replica.ReplicaState;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-
-public class ReplicaStateChecker implements Callable<Void> {
-
-    private final int WAIT_TIME = 2000;
-    private final Replica replica;
-    private final int replicationTimeOut;
-    private final ReplicationManager replicationManager;
-    private final boolean suspendReplication;
-
-    public ReplicaStateChecker(Replica replica, int replicationTimeOut, ReplicationManager replicationManager,
-            boolean suspendReplication) {
-        this.replica = replica;
-        this.replicationTimeOut = replicationTimeOut;
-        this.replicationManager = replicationManager;
-        this.suspendReplication = suspendReplication;
-    }
-
-    @Override
-    public Void call() throws Exception {
-        Thread.currentThread().setName("ReplicaConnector Thread");
-
-        long startTime = System.currentTimeMillis();
-        InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replica.getClusterIp(),
-                replica.getPort());
-
-        while (true) {
-            try (SocketChannel connection = SocketChannel.open()) {
-                connection.configureBlocking(true);
-                connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
-                ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
-                connection.write(buffer);
-                replicationManager.updateReplicaState(replica.getId(), ReplicaState.ACTIVE, suspendReplication);
-                return null;
-            } catch (IOException | UnresolvedAddressException e) {
-                Thread.sleep(WAIT_TIME);
-
-                //check if connection to replica timed out
-                if (((System.currentTimeMillis() - startTime) / 1000) >= replicationTimeOut) {
-                    replicationManager.updateReplicaState(replica.getId(), ReplicaState.DEAD, suspendReplication);
-                    return null;
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index c0863e2..f19f1cd 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -18,79 +18,23 @@
  */
 package org.apache.asterix.replication.management;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.context.IndexInfo;
-import org.apache.asterix.common.dataflow.DatasetLocalResource;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
-import org.apache.asterix.common.replication.IReplicationManager;
-import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.common.replication.IReplicationThread;
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.storage.DatasetResourceReference;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
-import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.ResourceReference;
-import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
-import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.LogSource;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.common.utils.TransactionUtil;
-import org.apache.asterix.replication.functions.ReplicaFilesRequest;
-import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
-import org.apache.asterix.replication.functions.ReplicationProtocol;
-import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
-import org.apache.asterix.replication.logging.RemoteLogMapping;
-import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
-import org.apache.asterix.replication.messaging.DeleteFileTask;
-import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
-import org.apache.asterix.replication.messaging.ReplicateFileTask;
-import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
-import org.apache.asterix.replication.storage.LSMComponentProperties;
-import org.apache.asterix.replication.storage.LSMIndexFileProperties;
-import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.asterix.transaction.management.service.logging.LogBuffer;
-import org.apache.hyracks.api.application.INCServiceContext;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.replication.logging.RemoteLogsProcessor;
+import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.asterix.replication.messaging.ReplicationProtocol.ReplicationRequestType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
-import org.apache.hyracks.storage.common.LocalResource;
-import org.apache.hyracks.util.StorageUtil;
-import org.apache.hyracks.util.StorageUtil.StorageUnit;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -101,200 +45,74 @@ import org.apache.logging.log4j.Logger;
 public class ReplicationChannel extends Thread implements IReplicationChannel {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1;
-    private final ExecutorService replicationThreads;
-    private final String localNodeID;
-    private final ILogManager logManager;
-    private final ReplicaResourcesManager replicaResourcesManager;
     private ServerSocketChannel serverSocketChannel = null;
-    private final IReplicationManager replicationManager;
-    private final ReplicationProperties replicationProperties;
-    private final IAppRuntimeContextProvider appContextProvider;
-    private static final int INTIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUnit.KILOBYTE);
-    private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
-    private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
-    private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
-    private final Map<Long, RemoteLogMapping> localLsn2RemoteMapping;
-    private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
-    private final LSMComponentsSyncService lsmComponentLSNMappingService;
-    private final ReplicationNotifier replicationNotifier;
-    private final Object flushLogslock = new Object();
-    private final IDatasetLifecycleManager dsLifecycleManager;
-    private final PersistentLocalResourceRepository localResourceRep;
-    private final IReplicationStrategy replicationStrategy;
-    private final NCConfig ncConfig;
-    private Set nodeHostedPartitions;
-    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
     private final INcApplicationContext appCtx;
+    private final RemoteLogsProcessor logsProcessor;
 
-    public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager,
-            IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager,
-            INCServiceContext ncServiceContext, IAppRuntimeContextProvider asterixAppRuntimeContextProvider,
-            IReplicationStrategy replicationStrategy) {
-        this.logManager = logManager;
-        this.localNodeID = nodeId;
-        this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager;
-        this.replicationManager = replicationManager;
-        this.replicationProperties = replicationProperties;
-        this.appContextProvider = asterixAppRuntimeContextProvider;
-        this.dsLifecycleManager = asterixAppRuntimeContextProvider.getDatasetLifecycleManager();
-        this.localResourceRep = (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider
-                .getLocalResourceRepository();
-        this.replicationStrategy = replicationStrategy;
-        this.ncConfig = ((NodeControllerService) ncServiceContext.getControllerService()).getConfiguration();
-        lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>();
-        pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
-        lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
-        replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<>();
-        localLsn2RemoteMapping = new ConcurrentHashMap<>();
-        lsmComponentLSNMappingService = new LSMComponentsSyncService();
-        replicationNotifier = new ReplicationNotifier();
-        replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
-        Map<String, ClusterPartition[]> nodePartitions = asterixAppRuntimeContextProvider.getAppContext()
-                .getMetadataProperties().getNodePartitions();
-        Set<String> nodeReplicationClients = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
-                .map(Replica::getId).collect(Collectors.toSet());
-        List<Integer> clientsPartitions = new ArrayList<>();
-        for (String clientId : nodeReplicationClients) {
-            for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) {
-                clientsPartitions.add(clusterPartition.getPartitionId());
-            }
-        }
-        nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
-        nodeHostedPartitions.addAll(clientsPartitions);
-        this.indexCheckpointManagerProvider =
-                ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
-        this.appCtx = (INcApplicationContext) ncServiceContext.getApplicationContext();
+    public ReplicationChannel(INcApplicationContext appCtx) {
+        this.appCtx = appCtx;
+        logsProcessor = new RemoteLogsProcessor(appCtx);
     }
 
     @Override
     public void run() {
-        Thread.currentThread().setName("Replication Channel Thread");
-
-        String nodeIP = replicationProperties.getNodeIpFromId(localNodeID);
-        int dataPort = ncConfig.getReplicationPublicPort();
+        final String nodeId = appCtx.getServiceContext().getNodeId();
+        Thread.currentThread().setName(nodeId + " Replication Channel Thread");
+        final ReplicationProperties replicationProperties = appCtx.getReplicationProperties();
+        final String nodeIP = replicationProperties.getReplicationAddress();
+        final int dataPort = replicationProperties.getReplicationPort();
         try {
             serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(true);
-            InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP),
-                    dataPort);
+            InetSocketAddress replicationChannelAddress =
+                    new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort);
             serverSocketChannel.socket().bind(replicationChannelAddress);
-            lsmComponentLSNMappingService.start();
-            replicationNotifier.start();
             LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
-
-            //start accepting replication requests
             while (serverSocketChannel.isOpen()) {
                 SocketChannel socketChannel = serverSocketChannel.accept();
                 socketChannel.configureBlocking(true);
                 //start a new thread to handle the request
-                replicationThreads.execute(new ReplicationThread(socketChannel));
+                appCtx.getThreadExecutor().execute(new ReplicationWorker(socketChannel));
             }
         } catch (AsynchronousCloseException e) {
-            LOGGER.warn("Replication channel closed", e);
+            LOGGER.debug("Replication channel closed", e);
         } catch (IOException e) {
-            throw new IllegalStateException(
-                    "Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
+            throw new IllegalStateException("Failed to bind replication channel @ " + nodeIP + ":" + dataPort, e);
         }
     }
 
-    private void updateLSMComponentRemainingFiles(String lsmComponentId) throws IOException {
-        LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(lsmComponentId);
-        int remainingFile = lsmCompProp.markFileComplete();
-
-        //clean up when all the LSM component files have been received.
-        if (remainingFile == 0) {
-            if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
-                    && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
-                int remainingIndexes = replicaUniqueLSN2RemoteMapping
-                        .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
-                if (remainingIndexes == 0) {
-                    /**
-                     * Note: there is a chance that this will never be removed because some
-                     * index in the dataset was not flushed because it is empty. This could
-                     * be solved by passing only the number of successfully flushed indexes.
-                     */
-                    replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
-                }
-            }
-
-            //delete mask to indicate that this component is now valid.
-            replicaResourcesManager.markLSMComponentReplicaAsValid(lsmCompProp);
-            lsmComponentId2PropertiesMap.remove(lsmComponentId);
-            LOGGER.log(Level.INFO, "Completed LSMComponent " + lsmComponentId + " Replication.");
-        }
+    public RemoteLogsProcessor getRemoteLogsProcessor() {
+        return logsProcessor;
     }
 
     @Override
     public void close() throws IOException {
-        serverSocketChannel.close();
-        LOGGER.log(Level.INFO, "Replication channel closed.");
+        if (serverSocketChannel != null) {
+            serverSocketChannel.close();
+            LOGGER.info("Replication channel closed.");
+        }
     }
 
-    /**
-     * A replication thread is created per received replication request.
-     */
-    private class ReplicationThread implements IReplicationThread {
+    private class ReplicationWorker implements IReplicationWorker {
         private final SocketChannel socketChannel;
-        private final LogRecord remoteLog;
-        private ByteBuffer inBuffer;
-        private ByteBuffer outBuffer;
+        private final ByteBuffer inBuffer;
+        private final ByteBuffer outBuffer;
 
-        public ReplicationThread(SocketChannel socketChannel) {
+        public ReplicationWorker(SocketChannel socketChannel) {
             this.socketChannel = socketChannel;
-            inBuffer = ByteBuffer.allocate(INTIAL_BUFFER_SIZE);
-            outBuffer = ByteBuffer.allocate(INTIAL_BUFFER_SIZE);
-            remoteLog = new LogRecord();
+            inBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE);
+            outBuffer = ByteBuffer.allocate(ReplicationProtocol.INITIAL_BUFFER_SIZE);
         }
 
         @Override
         public void run() {
-            Thread.currentThread().setName("Replication Thread");
+            final String oldName = Thread.currentThread().getName();
+            Thread.currentThread().setName("Replication WorkerÙ‹");
             try {
-                ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
-                        inBuffer);
-                while (replicationFunction != ReplicationRequestType.GOODBYE) {
-                    switch (replicationFunction) {
-                        case REPLICATE_LOG:
-                            handleLogReplication();
-                            break;
-                        case LSM_COMPONENT_PROPERTIES:
-                            handleLSMComponentProperties();
-                            break;
-                        case REPLICATE_FILE:
-                            handleReplicateFile();
-                            break;
-                        case DELETE_FILE:
-                            handleDeleteFile();
-                            break;
-                        case REPLICA_EVENT:
-                            handleReplicaEvent();
-                            break;
-                        case GET_REPLICA_MAX_LSN:
-                            handleGetReplicaMaxLSN();
-                            break;
-                        case GET_REPLICA_FILES:
-                            handleGetReplicaFiles();
-                            break;
-                        case FLUSH_INDEX:
-                            handleFlushIndex();
-                            break;
-                        case PARTITION_RESOURCES_REQUEST:
-                            handleGetPartitionResources();
-                            break;
-                        case REPLICATE_RESOURCE_FILE:
-                            handleReplicateResourceFile();
-                            break;
-                        case DELETE_RESOURCE_FILE:
-                            handleDeleteResourceFile();
-                            break;
-                        case CHECKPOINT_PARTITION:
-                            handleCheckpointPartition();
-                            break;
-                        default:
-                            throw new IllegalStateException("Unknown replication request");
-                    }
-                    replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
+                ReplicationRequestType requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
+                while (requestType != ReplicationRequestType.GOODBYE) {
+                    handle(requestType);
+                    requestType = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 }
             } catch (Exception e) {
                 LOGGER.warn("Unexpectedly error during replication.", e);
@@ -303,261 +121,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                     try {
                         socketChannel.close();
                     } catch (IOException e) {
-                        LOGGER.warn("Filed to close replication socket.", e);
-                    }
-                }
-            }
-        }
-
-        private void handleFlushIndex() throws IOException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            //read which indexes are requested to be flushed from remote replica
-            ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
-            Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
-
-            /**
-             * check which indexes can be flushed (open indexes) and which cannot be
-             * flushed (closed or have empty memory component).
-             */
-            IDatasetLifecycleManager datasetLifeCycleManager = appContextProvider.getDatasetLifecycleManager();
-            List<IndexInfo> openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo();
-            Set<Integer> datasetsToForceFlush = new HashSet<>();
-            for (IndexInfo iInfo : openIndexesInfo) {
-                if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
-                    AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
-                            .getIOOperationCallback();
-                    //if an index has a pending flush, then the request to flush it will succeed.
-                    if (ioCallback.hasPendingFlush()) {
-                        //remove index to indicate that it will be flushed
-                        requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
-                    } else if (!((AbstractLSMIndex) iInfo.getIndex()).isCurrentMutableComponentEmpty()) {
-                        /**
-                         * if an index has something to be flushed, then the request to flush it
-                         * will succeed and we need to schedule it to be flushed.
-                         */
-                        datasetsToForceFlush.add(iInfo.getDatasetId());
-                        //remove index to indicate that it will be flushed
-                        requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
+                        LOGGER.warn("Failed to close replication socket.", e);
                     }
                 }
-            }
-
-            //schedule flush for datasets requested to be flushed
-            for (int datasetId : datasetsToForceFlush) {
-                datasetLifeCycleManager.flushDataset(datasetId, true);
-            }
-
-            //the remaining indexes in the requested set are those which cannot be flushed.
-            //respond back to the requester that those indexes cannot be flushed
-            ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
-            outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
-            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-        }
-
-        private void handleLSMComponentProperties() throws IOException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(inBuffer);
-            //create mask to indicate that this component is not valid yet
-            replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp);
-            lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp);
-        }
-
-        private void handleReplicateFile() throws IOException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
-
-            //get index path
-            String indexPath = replicaResourcesManager.getIndexPath(afp);
-            String replicaFilePath = indexPath + File.separator + afp.getFileName();
-
-            //create file
-            File destFile = new File(replicaFilePath);
-            destFile.createNewFile();
-
-            try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
-                    FileChannel fileChannel = fileOutputStream.getChannel()) {
-                fileOutputStream.setLength(afp.getFileSize());
-                NetworkingUtil.downloadFile(fileChannel, socketChannel);
-                fileChannel.force(true);
-
-                if (afp.requiresAck()) {
-                    ReplicationProtocol.sendAck(socketChannel);
-                }
-                if (afp.isLSMComponentFile()) {
-                    String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
-                    final LSMComponentProperties lsmComponentProperties = lsmComponentId2PropertiesMap.get(componentId);
-                    // merge operations do not generate flush logs
-                    if (afp.requiresAck() && lsmComponentProperties.getOpType() == LSMOperationType.FLUSH) {
-                        LSMComponentLSNSyncTask syncTask =
-                                new LSMComponentLSNSyncTask(componentId, destFile.getAbsolutePath());
-                        lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
-                    } else {
-                        updateLSMComponentRemainingFiles(componentId);
-                    }
-                } else {
-                    //index metadata file
-                    final ResourceReference indexRef = ResourceReference.of(destFile.getAbsolutePath());
-                    indexCheckpointManagerProvider.get(indexRef).init(logManager.getAppendLSN());
-                }
-            }
-        }
-
-        private void handleGetReplicaMaxLSN() throws IOException {
-            long maxLNS = logManager.getAppendLSN();
-            outBuffer.clear();
-            outBuffer.putLong(maxLNS);
-            outBuffer.flip();
-            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-        }
-
-        private void handleGetReplicaFiles() throws IOException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
-
-            LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
-
-            List<String> filesList;
-            Set<Integer> partitionIds = request.getPartitionIds();
-            Set<String> requesterExistingFiles = request.getExistingFiles();
-            Map<Integer, ClusterPartition> clusterPartitions = appContextProvider.getAppContext()
-                    .getMetadataProperties().getClusterPartitions();
-
-            // Flush replicated datasets to generate the latest LSM components
-            dsLifecycleManager.flushDataset(replicationStrategy);
-            for (Integer partitionId : partitionIds) {
-                ClusterPartition partition = clusterPartitions.get(partitionId);
-                filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false);
-                //start sending files
-                for (String filePath : filesList) {
-                    // Send only files of datasets that are replciated.
-                    DatasetResourceReference indexFileRef = localResourceRep.getLocalResourceReference(filePath);
-                    if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
-                        continue;
-                    }
-                    String relativeFilePath = StoragePathUtil.getIndexFileRelativePath(filePath);
-                    //if the file already exists on the requester, skip it
-                    if (!requesterExistingFiles.contains(relativeFilePath)) {
-                        try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
-                                FileChannel fileChannel = fromFile.getChannel();) {
-                            long fileSize = fileChannel.size();
-                            fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false, false);
-                            outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
-                                    ReplicationRequestType.REPLICATE_FILE);
-
-                            //send file info
-                            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-
-                            //transfer file
-                            NetworkingUtil.sendFile(fileChannel, socketChannel);
-                        }
-                    }
-                }
-            }
-
-            //send goodbye (end of files)
-            ReplicationProtocol.sendGoodbye(socketChannel);
-        }
-
-        private void handleReplicaEvent() throws IOException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
-            replicationManager.reportReplicaEvent(event);
-        }
-
-        private void handleDeleteFile() throws IOException {
-            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-            LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
-            replicaResourcesManager.deleteIndexFile(fileProp);
-            if (fileProp.requiresAck()) {
-                ReplicationProtocol.sendAck(socketChannel);
-            }
-        }
-
-        private void handleLogReplication() throws IOException, ACIDException {
-            //set initial buffer size to a log buffer page size
-            inBuffer = ByteBuffer.allocate(logManager.getLogPageSize());
-            while (true) {
-                //read a batch of logs
-                inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
-                //check if it is end of handshake (a single byte log)
-                if (inBuffer.remaining() == LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) {
-                    break;
-                }
-
-                processLogsBatch(inBuffer);
-            }
-        }
-
-        private void processLogsBatch(ByteBuffer buffer) throws ACIDException {
-            while (buffer.hasRemaining()) {
-                //get rid of log size
-                inBuffer.getInt();
-                //Deserialize log
-                remoteLog.readRemoteLog(inBuffer);
-                remoteLog.setLogSource(LogSource.REMOTE);
-
-                switch (remoteLog.getLogType()) {
-                    case LogType.UPDATE:
-                    case LogType.ENTITY_COMMIT:
-                        logManager.log(remoteLog);
-                        break;
-                    case LogType.JOB_COMMIT:
-                    case LogType.ABORT:
-                        LogRecord jobTerminationLog = new LogRecord();
-                        TransactionUtil.formJobTerminateLogRecord(jobTerminationLog, remoteLog.getTxnId(),
-                                remoteLog.getLogType() == LogType.JOB_COMMIT);
-                        jobTerminationLog.setReplicationThread(this);
-                        jobTerminationLog.setLogSource(LogSource.REMOTE);
-                        logManager.log(jobTerminationLog);
-                        break;
-                    case LogType.FLUSH:
-                        //store mapping information for flush logs to use them in incoming LSM components.
-                        RemoteLogMapping flushLogMap = new RemoteLogMapping();
-                        LogRecord flushLog = new LogRecord();
-                        TransactionUtil.formFlushLogRecord(flushLog, remoteLog.getDatasetId(), null,
-                                remoteLog.getNodeId(), remoteLog.getNumOfFlushedIndexes());
-                        flushLog.setReplicationThread(this);
-                        flushLog.setLogSource(LogSource.REMOTE);
-                        flushLogMap.setRemoteNodeID(remoteLog.getNodeId());
-                        flushLogMap.setRemoteLSN(remoteLog.getLSN());
-                        synchronized (localLsn2RemoteMapping) {
-                            logManager.log(flushLog);
-                            //the log LSN value is updated by logManager.log(.) to a local value
-                            flushLogMap.setLocalLSN(flushLog.getLSN());
-                            flushLogMap.numOfFlushedIndexes.set(flushLog.getNumOfFlushedIndexes());
-                            replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
-                            localLsn2RemoteMapping.put(flushLog.getLSN(), flushLogMap);
-                        }
-                        synchronized (flushLogslock) {
-                            flushLogslock.notify();
-                        }
-                        break;
-                    default:
-                        LOGGER.error("Unsupported LogType: " + remoteLog.getLogType());
-                }
-            }
-        }
-
-        /**
-         * this method is called sequentially by {@link LogBuffer#notifyReplicationTermination()}
-         * for JOB_COMMIT, JOB_ABORT, and FLUSH log types.
-         */
-        @Override
-        public void notifyLogReplicationRequester(LogRecord logRecord) {
-            switch (logRecord.getLogType()) {
-                case LogType.JOB_COMMIT:
-                case LogType.ABORT:
-                    pendingNotificationRemoteLogsQ.offer(logRecord);
-                    break;
-                case LogType.FLUSH:
-                    final RemoteLogMapping remoteLogMapping;
-                    synchronized (localLsn2RemoteMapping) {
-                        remoteLogMapping = localLsn2RemoteMapping.remove(logRecord.getLSN());
-                    }
-                    checkpointReplicaIndexes(remoteLogMapping, logRecord.getDatasetId());
-                    break;
-                default:
-                    throw new IllegalStateException("Unexpected log type: " + logRecord.getLogType());
+                Thread.currentThread().setName(oldName);
             }
         }
 
@@ -571,115 +138,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             return outBuffer;
         }
 
-        private void checkpointReplicaIndexes(RemoteLogMapping remoteLogMapping, int datasetId) {
-            try {
-                Predicate<LocalResource> replicaIndexesPredicate = lr -> {
-                    DatasetLocalResource dls = (DatasetLocalResource) lr.getResource();
-                    return dls.getDatasetId() == datasetId && !localResourceRep.getActivePartitions()
-                            .contains(dls.getPartition());
-                };
-                final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate);
-                final List<DatasetResourceReference> replicaIndexesRef =
-                        resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
-                for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
-                    final IIndexCheckpointManager indexCheckpointManager =
-                            indexCheckpointManagerProvider.get(replicaIndexRef);
-                    synchronized (indexCheckpointManager) {
-                        indexCheckpointManager
-                                .masterFlush(remoteLogMapping.getRemoteLSN(), remoteLogMapping.getLocalLSN());
-                    }
-                }
-            } catch (Exception e) {
-                LOGGER.error("Failed to checkpoint replica indexes", e);
-            }
-        }
-
-        private void handleGetPartitionResources() throws IOException {
-            final PartitionResourcesListTask task = (PartitionResourcesListTask) ReplicationProtocol
-                    .readMessage(ReplicationRequestType.PARTITION_RESOURCES_REQUEST, socketChannel, inBuffer);
-            task.perform(appCtx, this);
-        }
-
-        private void handleReplicateResourceFile() throws HyracksDataException {
-            ReplicateFileTask task = (ReplicateFileTask) ReplicationProtocol
-                    .readMessage(ReplicationRequestType.REPLICATE_RESOURCE_FILE, socketChannel, inBuffer);
+        private void handle(ReplicationRequestType requestType) throws HyracksDataException {
+            final IReplicaTask task =
+                    (IReplicaTask) ReplicationProtocol.readMessage(requestType, socketChannel, inBuffer);
             task.perform(appCtx, this);
         }
-
-        private void handleDeleteResourceFile() throws HyracksDataException {
-            DeleteFileTask task = (DeleteFileTask) ReplicationProtocol
-                    .readMessage(ReplicationRequestType.DELETE_RESOURCE_FILE, socketChannel, inBuffer);
-            task.perform(appCtx, this);
-        }
-
-        private void handleCheckpointPartition() throws HyracksDataException {
-            CheckpointPartitionIndexesTask task = (CheckpointPartitionIndexesTask) ReplicationProtocol
-                    .readMessage(ReplicationRequestType.CHECKPOINT_PARTITION, socketChannel, inBuffer);
-            task.perform(appCtx, this);
-        }
-    }
-
-    /**
-     * This thread is responsible for sending JOB_COMMIT/ABORT ACKs to replication clients.
-     */
-    private class ReplicationNotifier extends Thread {
-        @Override
-        public void run() {
-            Thread.currentThread().setName("ReplicationNotifier Thread");
-            while (true) {
-                try {
-                    LogRecord logRecord = pendingNotificationRemoteLogsQ.take();
-                    //send ACK to requester
-                    logRecord.getReplicationThread().getChannel().socket().getOutputStream()
-                            .write((localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getTxnId()
-                                    + System.lineSeparator()).getBytes());
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                } catch (IOException e) {
-                    LOGGER.warn("Failed to send job replication ACK", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * This thread is responsible for synchronizing the LSN of
-     * the received LSM components to a local LSN.
-     */
-    private class LSMComponentsSyncService extends Thread {
-
-        @Override
-        public void run() {
-            Thread.currentThread().setName("LSMComponentsSyncService Thread");
-
-            while (true) {
-                try {
-                    LSMComponentLSNSyncTask syncTask = lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
-                    LSMComponentProperties lsmCompProp = lsmComponentId2PropertiesMap.get(syncTask.getComponentId());
-                    syncLSMComponentFlushLSN(lsmCompProp, syncTask);
-                    updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                } catch (Exception e) {
-                    LOGGER.error("Unexpected exception during LSN synchronization", e);
-                }
-            }
-        }
-
-        private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask)
-                throws InterruptedException, IOException {
-            final String componentFilePath = syncTask.getComponentFilePath();
-            final ResourceReference indexRef = ResourceReference.of(componentFilePath);
-            final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(indexRef);
-            synchronized (indexCheckpointManager) {
-                long masterLsn = lsmCompProp.getOriginalLSN();
-                // wait until the lsn mapping is flushed to disk
-                while (!indexCheckpointManager.isFlushed(masterLsn)) {
-                    indexCheckpointManager.wait();
-                }
-                indexCheckpointManager
-                        .replicated(AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()), masterLsn);
-            }
-        }
     }
 }