You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/27 00:31:25 UTC

[2/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
deleted file mode 100644
index 8e020eb..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.functions;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.management.NetworkingUtil;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
-import org.apache.asterix.replication.storage.LSMComponentProperties;
-
-public class AsterixReplicationProtocol {
-
-    /**
-     * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
-     */
-    public static final String JOB_COMMIT_ACK = "$";
-
-    public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
-    public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
-
-    /* 
-     * ReplicationRequestType:
-     * REPLICATE_LOG: txn log replication
-     * REPLICATE_FILE: replicate a file(s)
-     * DELETE_FILE: delete a file(s)
-     * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
-     * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
-     * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
-     * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
-     * UPDATE_REPLICA: used to update replica info such as IP Address change.
-     * GOODBYE: used to notify replicas that the replication request has been completed
-     * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
-     * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
-     * ACK: used to notify the requesting replica that the request has been completed successfully
-     * FLUSH_INDEX: request remote replica to flush an LSM component
-     */
-    public enum ReplicationRequestType {
-        REPLICATE_LOG,
-        REPLICATE_FILE,
-        DELETE_FILE,
-        GET_REPLICA_FILES,
-        GET_REPLICA_LOGS,
-        GET_REPLICA_MAX_LSN,
-        GET_REPLICA_MIN_LSN,
-        UPDATE_REPLICA,
-        GOODBYE,
-        REPLICA_EVENT,
-        LSM_COMPONENT_PROPERTIES,
-        ACK,
-        FLUSH_INDEX
-    }
-
-    public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
-        //read request size
-        NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
-        int requestSize = dataBuffer.getInt();
-
-        if (dataBuffer.capacity() < requestSize) {
-            dataBuffer = ByteBuffer.allocate(requestSize);
-        }
-
-        //read request
-        NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize);
-
-        return dataBuffer;
-    }
-
-    public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
-            throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        lsmCompProp.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
-        }
-        buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
-    }
-
-    public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
-            throws IOException {
-        //read replication request type
-        NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
-
-        ReplicationRequestType requestType = AsterixReplicationProtocol.ReplicationRequestType.values()[byteBuffer
-                .getInt()];
-        return requestType;
-    }
-
-    public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return LSMComponentProperties.create(dis);
-    }
-
-    public static ByteBuffer getGoodbyeBuffer() {
-        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
-        bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
-        bb.flip();
-        return bb;
-    }
-
-    public static ByteBuffer getAckBuffer() {
-        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
-        bb.putInt(ReplicationRequestType.ACK.ordinal());
-        bb.flip();
-        return bb;
-    }
-
-    public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
-        requestBuffer.clear();
-        //put request type (4 bytes)
-        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
-        //leave space for log size
-        requestBuffer.position(requestBuffer.position() + Integer.BYTES);
-        int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
-        //put request size (4 bytes)
-        requestBuffer.putInt(4, logSize);
-        requestBuffer.flip();
-    }
-
-    public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
-        requestBuffer.clear();
-        //put request type (4 bytes)
-        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
-        //length of the log
-        requestBuffer.putInt(serializedLog.length);
-        //the log itself
-        requestBuffer.put(serializedLog);
-        requestBuffer.flip();
-    }
-
-    public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, AsterixLSMIndexFileProperties afp,
-            ReplicationRequestType requestType) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        afp.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (requestBuffer.capacity() < requestSize) {
-            requestBuffer = ByteBuffer.allocate(requestSize);
-        } else {
-            requestBuffer.clear();
-        }
-        requestBuffer.putInt(requestType.ordinal());
-        requestBuffer.putInt(oos.size());
-        requestBuffer.put(outputStream.toByteArray());
-        requestBuffer.flip();
-        return requestBuffer;
-    }
-
-    public static AsterixLSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return AsterixLSMIndexFileProperties.create(dis);
-    }
-
-    public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaLogsRequest.create(dis);
-    }
-
-    public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
-            throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (requestBuffer.capacity() < requestSize) {
-            requestBuffer = ByteBuffer.allocate(requestSize);
-        } else {
-            requestBuffer.clear();
-        }
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
-        requestBuffer.putInt(oos.size());
-        requestBuffer.put(outputStream.toByteArray());
-        requestBuffer.flip();
-        return requestBuffer;
-    }
-
-    public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-
-        oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
-        replica.writeFields(oos);
-        oos.close();
-
-        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
-        buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        return buffer;
-    }
-
-    public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        event.serialize(oos);
-        oos.close();
-
-        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
-        buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
-    }
-
-    public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return Replica.create(dis);
-    }
-
-    public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-
-        return ReplicaEvent.create(dis);
-    }
-
-    public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
-        }
-        buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-    }
-
-    public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
-            throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
-        }
-        buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
-    }
-
-    public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaFilesRequest.create(dis);
-    }
-
-    public static ReplicaIndexFlushRequest readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaIndexFlushRequest.create(dis);
-    }
-
-    public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) {
-        requestBuffer.clear();
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal());
-        requestBuffer.flip();
-    }
-
-    public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
-        requestBuffer.clear();
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
-        requestBuffer.flip();
-    }
-
-    public static int getJobIdFromLogAckMessage(String msg) {
-        return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
-    }
-
-    public static String getNodeIdFromLogAckMessage(String msg) {
-        return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
-    }
-
-    /**
-     * Sends a goodbye request to a remote replica indicating the end of a replication request.
-     * 
-     * @param socketChannel
-     *            the remote replica socket.
-     * @throws IOException
-     */
-    public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
-        ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
-        NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
-    }
-
-    public static void sendAck(SocketChannel socketChannel) throws IOException {
-        ByteBuffer ackBuffer = AsterixReplicationProtocol.getAckBuffer();
-        NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
new file mode 100644
index 0000000..be8f8e3
--- /dev/null
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.functions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+
+public class ReplicationProtocol {
+
+    /**
+     * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
+     */
+    public static final String JOB_COMMIT_ACK = "$";
+
+    public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
+    public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
+
+    /* 
+     * ReplicationRequestType:
+     * REPLICATE_LOG: txn log replication
+     * REPLICATE_FILE: replicate a file(s)
+     * DELETE_FILE: delete a file(s)
+     * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
+     * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
+     * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
+     * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
+     * UPDATE_REPLICA: used to update replica info such as IP Address change.
+     * GOODBYE: used to notify replicas that the replication request has been completed
+     * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
+     * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
+     * ACK: used to notify the requesting replica that the request has been completed successfully
+     * FLUSH_INDEX: request remote replica to flush an LSM component
+     */
+    public enum ReplicationRequestType {
+        REPLICATE_LOG,
+        REPLICATE_FILE,
+        DELETE_FILE,
+        GET_REPLICA_FILES,
+        GET_REPLICA_LOGS,
+        GET_REPLICA_MAX_LSN,
+        GET_REPLICA_MIN_LSN,
+        UPDATE_REPLICA,
+        GOODBYE,
+        REPLICA_EVENT,
+        LSM_COMPONENT_PROPERTIES,
+        ACK,
+        FLUSH_INDEX
+    }
+
+    public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
+        //read request size
+        NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
+        int requestSize = dataBuffer.getInt();
+
+        if (dataBuffer.capacity() < requestSize) {
+            dataBuffer = ByteBuffer.allocate(requestSize);
+        }
+
+        //read request
+        NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize);
+
+        return dataBuffer;
+    }
+
+    public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
+            throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        lsmCompProp.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (buffer.capacity() < requestSize) {
+            buffer = ByteBuffer.allocate(requestSize);
+        } else {
+            buffer.clear();
+        }
+        buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+        return buffer;
+    }
+
+    public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
+            throws IOException {
+        //read replication request type
+        NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
+
+        ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer
+                .getInt()];
+        return requestType;
+    }
+
+    public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return LSMComponentProperties.create(dis);
+    }
+
+    public static ByteBuffer getGoodbyeBuffer() {
+        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
+        bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
+        bb.flip();
+        return bb;
+    }
+
+    public static ByteBuffer getAckBuffer() {
+        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
+        bb.putInt(ReplicationRequestType.ACK.ordinal());
+        bb.flip();
+        return bb;
+    }
+
+    public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
+        requestBuffer.clear();
+        //put request type (4 bytes)
+        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
+        //leave space for log size
+        requestBuffer.position(requestBuffer.position() + Integer.BYTES);
+        int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
+        //put request size (4 bytes)
+        requestBuffer.putInt(4, logSize);
+        requestBuffer.flip();
+    }
+
+    public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
+        requestBuffer.clear();
+        //put request type (4 bytes)
+        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
+        //length of the log
+        requestBuffer.putInt(serializedLog.length);
+        //the log itself
+        requestBuffer.put(serializedLog);
+        requestBuffer.flip();
+    }
+
+    public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
+            ReplicationRequestType requestType) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        afp.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (requestBuffer.capacity() < requestSize) {
+            requestBuffer = ByteBuffer.allocate(requestSize);
+        } else {
+            requestBuffer.clear();
+        }
+        requestBuffer.putInt(requestType.ordinal());
+        requestBuffer.putInt(oos.size());
+        requestBuffer.put(outputStream.toByteArray());
+        requestBuffer.flip();
+        return requestBuffer;
+    }
+
+    public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return LSMIndexFileProperties.create(dis);
+    }
+
+    public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return ReplicaLogsRequest.create(dis);
+    }
+
+    public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
+            throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        request.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (requestBuffer.capacity() < requestSize) {
+            requestBuffer = ByteBuffer.allocate(requestSize);
+        } else {
+            requestBuffer.clear();
+        }
+        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
+        requestBuffer.putInt(oos.size());
+        requestBuffer.put(outputStream.toByteArray());
+        requestBuffer.flip();
+        return requestBuffer;
+    }
+
+    public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+
+        oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
+        replica.writeFields(oos);
+        oos.close();
+
+        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+        buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        return buffer;
+    }
+
+    public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        event.serialize(oos);
+        oos.close();
+
+        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+        buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return Replica.create(dis);
+    }
+
+    public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+
+        return ReplicaEvent.create(dis);
+    }
+
+    public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        request.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (buffer.capacity() < requestSize) {
+            buffer = ByteBuffer.allocate(requestSize);
+        } else {
+            buffer.clear();
+        }
+        buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+    }
+
+    public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
+            throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        request.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (buffer.capacity() < requestSize) {
+            buffer = ByteBuffer.allocate(requestSize);
+        } else {
+            buffer.clear();
+        }
+        buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+        return buffer;
+    }
+
+    public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return ReplicaFilesRequest.create(dis);
+    }
+
+    public static ReplicaIndexFlushRequest readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return ReplicaIndexFlushRequest.create(dis);
+    }
+
+    public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) {
+        requestBuffer.clear();
+        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal());
+        requestBuffer.flip();
+    }
+
+    public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
+        requestBuffer.clear();
+        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
+        requestBuffer.flip();
+    }
+
+    public static int getJobIdFromLogAckMessage(String msg) {
+        return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
+    }
+
+    public static String getNodeIdFromLogAckMessage(String msg) {
+        return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
+    }
+
+    /**
+     * Sends a goodbye request to a remote replica indicating the end of a replication request.
+     * 
+     * @param socketChannel
+     *            the remote replica socket.
+     * @throws IOException
+     */
+    public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+        NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
+    }
+
+    public static void sendAck(SocketChannel socketChannel) throws IOException {
+        ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
+        NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 38be05e..a7cfaec 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -27,7 +27,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.management.ReplicationManager;
 
@@ -53,7 +53,7 @@ public class ReplicationLogBuffer {
     }
 
     public void append(ILogRecord logRecord) {
-        appendBuffer.putInt(AsterixReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+        appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
         appendBuffer.putInt(logRecord.getSerializedLogSize());
         appendBuffer.put(logRecord.getSerializedLog());
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
index 633d87a..9915c83 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
@@ -30,7 +30,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 
 public class ReplicaEventNotifier implements Runnable {
 
@@ -61,7 +61,7 @@ public class ReplicaEventNotifier implements Runnable {
 
         ByteBuffer buffer = null;
         try {
-            buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+            buffer = ReplicationProtocol.writeReplicaEventRequest(event);
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -79,7 +79,7 @@ public class ReplicaEventNotifier implements Runnable {
                     //send replica event
                     connection.write(buffer);
                     //send goodbye
-                    connection.write(AsterixReplicationProtocol.getGoodbyeBuffer());
+                    connection.write(ReplicationProtocol.getGoodbyeBuffer());
                     break;
                 } catch (IOException | UnresolvedAddressException e) {
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index 07ed144..0c94c61 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Callable;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 
 public class ReplicaStateChecker implements Callable<Void> {
 
@@ -61,7 +61,7 @@ public class ReplicaStateChecker implements Callable<Void> {
                 connection = SocketChannel.open();
                 connection.configureBlocking(true);
                 connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
-                ByteBuffer buffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+                ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
                 connection.write(buffer);
                 replicationManager.updateReplicaState(replica.getId(), ReplicaState.ACTIVE, suspendReplication);
                 return null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index e6b2ebf..c97fe94 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -42,7 +42,9 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -59,15 +61,15 @@ import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.logging.RemoteLogMapping;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
 import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
@@ -210,7 +212,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         public void run() {
             Thread.currentThread().setName("Replication Thread");
             try {
-                ReplicationRequestType replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+                ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
                         inBuffer);
                 while (replicationFunction != ReplicationRequestType.GOODBYE) {
                     switch (replicationFunction) {
@@ -251,7 +253,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                             throw new IllegalStateException("Unknown replication request");
                         }
                     }
-                    replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel, inBuffer);
+                    replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -267,9 +269,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         }
 
         private void handleFlushIndex() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
             //1. read which indexes are requested to be flushed from remote replica
-            ReplicaIndexFlushRequest request = AsterixReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
+            ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
             Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
 
             //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
@@ -302,26 +304,25 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             //the remaining indexes in the requested set are those which cannot be flushed.
             //4. respond back to the requester that those indexes cannot be flushed
             ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
-            outBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
+            outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
             NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
         }
 
         private void handleLSMComponentProperties() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            LSMComponentProperties lsmCompProp = AsterixReplicationProtocol.readLSMPropertiesRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(inBuffer);
             //create mask to indicate that this component is not valid yet
             replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp);
             lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp);
         }
 
         private void handleReplicateFile() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            AsterixLSMIndexFileProperties afp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
 
-            String replicaFolderPath = replicaResourcesManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(),
-                    afp.getDataverse(), afp.getIdxName());
-
-            String replicaFilePath = replicaFolderPath + File.separator + afp.getFileName();
+            //get index path
+            String indexPath = replicaResourcesManager.getIndexPath(afp);
+            String replicaFilePath = indexPath + File.separator + afp.getFileName();
 
             //create file
             File destFile = new File(replicaFilePath);
@@ -334,20 +335,20 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 fileChannel.force(true);
 
                 if (afp.requiresAck()) {
-                    AsterixReplicationProtocol.sendAck(socketChannel);
+                    ReplicationProtocol.sendAck(socketChannel);
                 }
                 if (afp.isLSMComponentFile()) {
-                    String compoentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath(), afp.getNodeId());
+                    String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
                     if (afp.getLSNByteOffset() != IMetaDataPageManager.INVALID_LSN_OFFSET) {
-                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(compoentId,
+                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId,
                                 destFile.getAbsolutePath(), afp.getLSNByteOffset());
                         lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
                     } else {
-                        updateLSMComponentRemainingFiles(compoentId);
+                        updateLSMComponentRemainingFiles(componentId);
                     }
                 } else {
                     //index metadata file
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(replicaFolderPath, logManager.getAppendLSN());
+                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
                 }
             }
         }
@@ -370,43 +371,48 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         }
 
         private void handleGetReplicaFiles() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaFilesRequest request = AsterixReplicationProtocol.readReplicaFileRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
 
-            AsterixLSMIndexFileProperties fileProperties = new AsterixLSMIndexFileProperties();
+            LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
 
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
-
+            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+                    .getAppContext()).getMetadataProperties().getNodePartitions();
             for (String replicaId : replicaIds) {
-                filesList = replicaResourcesManager.getResourcesForReplica(replicaId);
-
-                //start sending files
-                for (String filePath : filesList) {
-                    try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
-                            FileChannel fileChannel = fromFile.getChannel();) {
-                        long fileSize = fileChannel.size();
-                        fileProperties.initialize(filePath, fileSize, replicaId, false,
-                                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
-                        outBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
-                                ReplicationRequestType.REPLICATE_FILE);
-
-                        //send file info
-                        NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-
-                        //transfer file
-                        NetworkingUtil.sendFile(fileChannel, socketChannel);
+                //get replica partitions
+                ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
+                for (ClusterPartition partition : replicaPatitions) {
+                    filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId());
+
+                    //start sending files
+                    for (String filePath : filesList) {
+                        try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
+                                FileChannel fileChannel = fromFile.getChannel();) {
+                            long fileSize = fileChannel.size();
+                            fileProperties.initialize(filePath, fileSize, replicaId, false,
+                                    IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+                            outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer,
+                                    fileProperties, ReplicationRequestType.REPLICATE_FILE);
+
+                            //send file info
+                            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+
+                            //transfer file
+                            NetworkingUtil.sendFile(fileChannel, socketChannel);
+                        }
                     }
                 }
             }
 
             //send goodbye (end of files)
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         private void handleGetRemoteLogs() throws IOException, ACIDException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaLogsRequest request = AsterixReplicationProtocol.readReplicaLogsRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
 
             Set<String> replicaIds = request.getReplicaIds();
             long fromLSN = request.getFromLSN();
@@ -433,13 +439,13 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                     if (replicaIds.contains(logRecord.getNodeId()) && logRecord.getLogType() != LogType.FLUSH) {
                         if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
                             int requestSize = logRecord.getSerializedLogSize()
-                                    + AsterixReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
+                                    + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
                             outBuffer = ByteBuffer.allocate(requestSize);
                         }
 
                         //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
                         logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
-                        AsterixReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
+                        ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
                         NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
                     }
                     logRecord = logReader.next();
@@ -449,32 +455,32 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             }
 
             //send goodbye (end of logs)
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         private void handleUpdateReplica() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            Replica replica = AsterixReplicationProtocol.readReplicaUpdateRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            Replica replica = ReplicationProtocol.readReplicaUpdateRequest(inBuffer);
             replicationManager.updateReplicaInfo(replica);
         }
 
         private void handleReplicaEvent() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaEvent event = AsterixReplicationProtocol.readReplicaEventRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
             replicationManager.reportReplicaEvent(event);
         }
 
         private void handleDeleteFile() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            AsterixLSMIndexFileProperties fileProp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
-            replicaResourcesManager.deleteRemoteFile(fileProp);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
+            replicaResourcesManager.deleteIndexFile(fileProp);
             if (fileProp.requiresAck()) {
-                AsterixReplicationProtocol.sendAck(socketChannel);
+                ReplicationProtocol.sendAck(socketChannel);
             }
         }
 
         private void handleLogReplication() throws IOException, ACIDException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
 
             //Deserialize log
             remoteLog.readRemoteLog(inBuffer, false, localNodeID);
@@ -518,7 +524,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 //send ACK to requester
                 try {
                     socketChannel.socket().getOutputStream().write(
-                            (localNodeID + AsterixReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
+                            (localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
                                     .getBytes());
                     socketChannel.socket().getOutputStream().flush();
                 } catch (IOException e) {
@@ -625,6 +631,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             }
 
             File destFile = new File(syncTask.getComponentFilePath());
+            //prepare local LSN buffer
             ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
             metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
             metadataBuffer.flip();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 36e5dff..5c35df4 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -66,17 +66,16 @@ import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
 import org.apache.asterix.replication.logging.ReplicationLogFlusher;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
@@ -129,7 +128,8 @@ public class ReplicationManager implements IReplicationManager {
     private ReplicationLogFlusher txnlogsReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
     private Map<String, SocketChannel> logsReplicaSockets = null;
-
+    //TODO this class needs to be refactored by moving its private classes to separate files
+    //and possibly using MessageBroker to send/receive remote replicas events.
     public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties,
             IReplicaResourcesManager remoteResoucesManager, ILogManager logManager,
             IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
@@ -255,7 +255,7 @@ public class ReplicationManager implements IReplicationManager {
             throws IOException {
         boolean isLSMComponentFile;
         ByteBuffer responseBuffer = null;
-        AsterixLSMIndexFileProperties asterixFileProperties = new AsterixLSMIndexFileProperties();
+        LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
         if (requestBuffer == null) {
             requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         }
@@ -277,7 +277,7 @@ public class ReplicationManager implements IReplicationManager {
                         //send LSMComponent properties
                         LSMComponentJob = (ILSMIndexReplicationJob) job;
                         LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId);
-                        requestBuffer = AsterixReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
+                        requestBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
                                 requestBuffer);
                         sendRequest(replicasSockets, requestBuffer);
                     }
@@ -310,7 +310,7 @@ public class ReplicationManager implements IReplicationManager {
                                         IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
                             }
 
-                            requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer,
+                            requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
                                     asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
 
                             Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -350,7 +350,7 @@ public class ReplicationManager implements IReplicationManager {
                     remainingFiles--;
                     asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
                             IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
-                    AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
+                    ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
                             ReplicationRequestType.DELETE_FILE);
 
                     Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -392,13 +392,13 @@ public class ReplicationManager implements IReplicationManager {
     private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer)
             throws IOException {
         if (responseBuffer == null) {
-            responseBuffer = ByteBuffer.allocate(AsterixReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
+            responseBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
         } else {
             responseBuffer.clear();
         }
 
         //read response from remote replicas
-        ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+        ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
                 responseBuffer);
         return responseFunction;
     }
@@ -519,7 +519,7 @@ public class ReplicationManager implements IReplicationManager {
         node.setClusterIp(newAddress);
         Replica replica = new Replica(node);
 
-        ByteBuffer buffer = AsterixReplicationProtocol.writeUpdateReplicaRequest(replica);
+        ByteBuffer buffer = ReplicationProtocol.writeUpdateReplicaRequest(replica);
         Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
         sendRequest(replicaSockets, buffer);
         closeReplicaSockets(replicaSockets);
@@ -537,7 +537,7 @@ public class ReplicationManager implements IReplicationManager {
         node.setClusterIp(NetworkingUtil.getHostAddress(hostIPAddressFirstOctet));
         Replica replica = new Replica(node);
         ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.SHUTDOWN);
-        ByteBuffer buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+        ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event);
         Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
         sendRequest(replicaSockets, buffer);
         closeReplicaSockets(replicaSockets);
@@ -581,7 +581,7 @@ public class ReplicationManager implements IReplicationManager {
      */
     private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) {
         //send goodbye
-        ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
         sendRequest(replicaSockets, goodbyeBuffer);
 
         Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
@@ -910,7 +910,7 @@ public class ReplicationManager implements IReplicationManager {
         ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         for (String replicaId : replicaIds) {
             //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
-            HashMap<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
+            Map<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
                     nonSharpCheckpointTargetLSN);
 
             if (laggingIndexes.size() > 0) {
@@ -919,7 +919,7 @@ public class ReplicationManager implements IReplicationManager {
                 try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
                     ReplicaIndexFlushRequest laggingIndexesRequest = new ReplicaIndexFlushRequest(
                             laggingIndexes.keySet());
-                    requestBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
+                    requestBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
                             laggingIndexesRequest);
                     NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer);
 
@@ -927,19 +927,19 @@ public class ReplicationManager implements IReplicationManager {
                     ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer);
 
                     if (responseFunction == ReplicationRequestType.FLUSH_INDEX) {
-                        requestBuffer = AsterixReplicationProtocol.readRequest(socketChannel, requestBuffer);
+                        requestBuffer = ReplicationProtocol.readRequest(socketChannel, requestBuffer);
                         //returning the indexes that were not flushed
-                        laggingIndexesResponse = AsterixReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
+                        laggingIndexesResponse = ReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
                     }
                     //send goodbye
-                    AsterixReplicationProtocol.sendGoodbye(socketChannel);
+                    ReplicationProtocol.sendGoodbye(socketChannel);
                 }
 
                 //4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend.
                 if (laggingIndexesResponse != null) {
                     for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
                         String indexPath = laggingIndexes.get(resouceId);
-                        HashMap<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
+                        Map<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
                         indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
                         replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
                     }
@@ -953,7 +953,7 @@ public class ReplicationManager implements IReplicationManager {
     public long getMaxRemoteLSN(Set<String> remoteReplicas) throws IOException {
         long maxRemoteLSN = 0;
 
-        AsterixReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
+        ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
         Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
         try {
             for (String replicaId : remoteReplicas) {
@@ -988,26 +988,26 @@ public class ReplicationManager implements IReplicationManager {
     @Override
     public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover) throws IOException {
         ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover);
-        AsterixReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
+        ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
 
         try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
 
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
 
-            String destFolder;
+            String indexPath;
             String destFilePath;
-
-            ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+            ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
                     dataBuffer);
-            AsterixLSMIndexFileProperties fileProperties;
+            LSMIndexFileProperties fileProperties;
             while (responseFunction != ReplicationRequestType.GOODBYE) {
-                dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
+
+                fileProperties = ReplicationProtocol.readFileReplicationRequest(dataBuffer);
 
-                fileProperties = AsterixReplicationProtocol.readFileReplicationRequest(dataBuffer);
-                destFolder = replicaResourcesManager.getIndexPath(fileProperties.getNodeId(),
-                        fileProperties.getIoDeviceNum(), fileProperties.getDataverse(), fileProperties.getIdxName());
-                destFilePath = destFolder + File.separator + fileProperties.getFileName();
+                //get index path
+                indexPath = replicaResourcesManager.getIndexPath(fileProperties);
+                destFilePath = indexPath + File.separator + fileProperties.getFileName();
 
                 //create file
                 File destFile = new File(destFilePath);
@@ -1024,14 +1024,14 @@ public class ReplicationManager implements IReplicationManager {
                 //we need to create LSN map for .metadata files that belong to remote replicas
                 if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
                     //replica index
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(destFolder, logManager.getAppendLSN());
+                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
                 }
 
-                responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+                responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
     }
 
@@ -1039,7 +1039,7 @@ public class ReplicationManager implements IReplicationManager {
     @Override
     public long requestReplicaMinLSN(String selectedReplicaId) throws IOException {
         long minLSN = 0;
-        AsterixReplicationProtocol.writeMinLSNRequest(dataBuffer);
+        ReplicationProtocol.writeMinLSNRequest(dataBuffer);
         try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) {
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
@@ -1049,7 +1049,7 @@ public class ReplicationManager implements IReplicationManager {
             minLSN = dataBuffer.getLong();
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         return minLSN;
@@ -1060,19 +1060,19 @@ public class ReplicationManager implements IReplicationManager {
     public ArrayList<ILogRecord> requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN)
             throws IOException, ACIDException {
         ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
-        dataBuffer = AsterixReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
+        dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
 
         try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
 
             //read response type
-            ReplicationRequestType responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+            ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
 
             ArrayList<ILogRecord> recoveryLogs = new ArrayList<ILogRecord>();
             ILogRecord logRecord = new LogRecord();
             while (responseType != ReplicationRequestType.GOODBYE) {
-                dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
                 logRecord.readRemoteLog(dataBuffer, true, nodeId);
 
                 if (logRecord.getNodeId().equals(nodeId)) {
@@ -1085,11 +1085,11 @@ public class ReplicationManager implements IReplicationManager {
                     logManager.log(logRecord);
                 }
 
-                responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+                responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
             return recoveryLogs;
         }
     }
@@ -1136,11 +1136,7 @@ public class ReplicationManager implements IReplicationManager {
             updateReplicaState(replicaId, ReplicaState.DEAD, true);
 
             //delete any invalid LSMComponents for this replica
-            try {
-                replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
-            }
+            replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
         }
 
         public void handleShutdownEvent(String replicaId) {
@@ -1237,8 +1233,8 @@ public class ReplicationManager implements IReplicationManager {
                         break;
                     }
                     //read ACK for job commit log
-                    String replicaId = AsterixReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
-                    int jobId = AsterixReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
+                    String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+                    int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
                     addAckToJob(jobId, replicaId);
                 }
             } catch (AsynchronousCloseException e1) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index a82b535..ee987f8 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.replication.recovery;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,10 +31,12 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
@@ -55,13 +58,20 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
     @Override
     public void performRemoteRecovery() {
+        //TODO this method needs to be adapted to perform failback when autoFailover is enabled.
+        //Currently we will not allow a node to perform remote recovery since another replica
+        //already tookover its workload and might not resync correctly if there are on on-going
+        //jobs on the replica.
+        if (AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled()) {
+            throw new IllegalStateException("Cannot perform remote recovery when auto failover is enabled.");
+        }
         //The whole remote recovery process should be atomic.
         //Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery).
         int maxRecoveryAttempts = 10;
         PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
                 .getLocalResourceRepository();
         while (true) {
-            //start recovery recovery steps
+            //start recovery steps
             try {
                 maxRecoveryAttempts--;
 
@@ -76,7 +86,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                 int activeReplicasCount = replicationManager.getActiveReplicasCount();
 
                 if (activeReplicasCount == 0) {
-                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to performe remote recovery");
+                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
                 }
 
                 //2. clean any memory data that could've existed from previous failed recovery attempt
@@ -90,8 +100,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                 Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
 
                 //5. get max LSN from selected remote replicas
-                long maxRemoteLSN = 0;
-                maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
+                long maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
 
                 //6. force LogManager to start from a partition > maxLSN in selected remote replicas
                 logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
@@ -107,8 +116,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                     //2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node)
                     if (replicasDataToRecover.contains(logManager.getNodeId())) {
                         ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
-                                .initializeNewUniverse(
-                                        runtimeContext.getReplicaResourcesManager().getLocalStorageFolder());
+                                .initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
                         //initialize resource id factor to correct max resource id
                         runtimeContext.initializeResourceIdFactory();
                     }
@@ -140,7 +148,6 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
     }
 
     private Map<String, Set<String>> constructRemoteRecoveryPlan() {
-
         //1. identify which replicas reside in this node
         String localNodeId = logManager.getNodeId();
         Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
@@ -205,4 +212,14 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
         return recoveryList;
     }
+
+    @Override
+    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException {
+        long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
+        //reply logs > minLSN that belong to these partitions
+        //TODO (mhubail) currently we assume the logs for these partitions belong to the failed node
+        //this needs to be updated once log formats are updated to include the partition id
+        runtimeContext.getTransactionSubsystem().getRecoveryManager().replayPartitionsLogs(partitions, minLSN,
+                failedNode);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
deleted file mode 100644
index 67b39c4..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-
-public class AsterixFilesUtil {
-
-    public static void deleteFolder(String folderPath) throws IOException {
-        File folder = new File(folderPath);
-        if (folder.exists()) {
-            //delete files inside the folder
-            while (deleteDirecotryFiles(folderPath) != true) {
-                //if there is a file being written (e.g. LSM Component), wait and try again to delete the file
-                try {
-                    Thread.sleep(500);
-                } catch (InterruptedException e) {
-                    //ignore
-                }
-            }
-
-            //delete the folder itself
-            folder.delete();
-        }
-    }
-
-    private static boolean deleteDirecotryFiles(String dirPath) throws IOException {
-        try {
-            Path directory = Paths.get(dirPath);
-            Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
-                @Override
-                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
-                    Files.delete(file);
-                    return FileVisitResult.CONTINUE;
-                }
-
-                @Override
-                public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
-                    Files.delete(dir);
-                    return FileVisitResult.CONTINUE;
-                }
-
-                @Override
-                public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
-                    return FileVisitResult.CONTINUE;
-                }
-
-            });
-            return true;
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-    }
-}