You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/02/18 10:54:31 UTC

[5/6] incubator-asterixdb git commit: Asterix NCs Failback Support

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
new file mode 100644
index 0000000..7909a35
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackRequestMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Set;
+
+public class PreparePartitionsFailbackRequestMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+    private boolean releaseMetadataNode = false;
+    private final String nodeID;
+
+    public PreparePartitionsFailbackRequestMessage(long planId, int requestId, String nodeId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.nodeID = nodeId;
+        this.partitions = partitions;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_REQUEST;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+
+    public boolean isReleaseMetadataNode() {
+        return releaseMetadataNode;
+    }
+
+    public void setReleaseMetadataNode(boolean releaseMetadataNode) {
+        this.releaseMetadataNode = releaseMetadataNode;
+    }
+
+    public String getNodeID() {
+        return nodeID;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Partitions: " + partitions);
+        sb.append(" releaseMetadataNode: " + releaseMetadataNode);
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
new file mode 100644
index 0000000..6b058c7
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/PreparePartitionsFailbackResponseMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import java.util.Set;
+
+public class PreparePartitionsFailbackResponseMessage extends AbstractFailbackPlanMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Set<Integer> partitions;
+
+    public PreparePartitionsFailbackResponseMessage(long planId, int requestId, Set<Integer> partitions) {
+        super(planId, requestId);
+        this.partitions = partitions;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.PREPARE_PARTITIONS_FAILBACK_RESPONSE;
+    }
+
+    public Set<Integer> getPartitions() {
+        return partitions;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
new file mode 100644
index 0000000..28fd36f
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/ReplicaEventMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
+
+public class ReplicaEventMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final ClusterEventType event;
+    private final String nodeIPAddress;
+
+    public ReplicaEventMessage(String nodeId, String nodeIPAddress, ClusterEventType event) {
+        this.nodeId = nodeId;
+        this.nodeIPAddress = nodeIPAddress;
+        this.event = event;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.REPLICA_EVENT;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public ClusterEventType getEvent() {
+        return event;
+    }
+
+    public String getNodeIPAddress() {
+        return nodeIPAddress;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
index abfa7d2..ad5234b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -22,15 +22,12 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
 
     private static final long serialVersionUID = 1L;
     private final Integer[] partitions;
-    private final String failedNode;
     private final long requestId;
     private final String nodeId;
 
-    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
-            Integer[] partitionsToTakeover) {
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
         this.requestId = requestId;
         this.nodeId = nodeId;
-        this.failedNode = failedNode;
         this.partitions = partitionsToTakeover;
     }
 
@@ -47,10 +44,6 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
         return requestId;
     }
 
-    public String getFailedNode() {
-        return failedNode;
-    }
-
     public String getNodeId() {
         return nodeId;
     }
@@ -60,7 +53,6 @@ public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage
         StringBuilder sb = new StringBuilder();
         sb.append("Request ID: " + requestId);
         sb.append(" Node ID: " + nodeId);
-        sb.append(" Failed Node: " + failedNode);
         sb.append(" Partitions: ");
         for (Integer partitionId : partitions) {
             sb.append(partitionId + ",");

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 57a0dae..5d2e263 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -30,7 +30,12 @@ public interface IApplicationMessage extends IMessage {
         TAKEOVER_PARTITIONS_REQUEST,
         TAKEOVER_PARTITIONS_RESPONSE,
         TAKEOVER_METADATA_NODE_REQUEST,
-        TAKEOVER_METADATA_NODE_RESPONSE
+        TAKEOVER_METADATA_NODE_RESPONSE,
+        PREPARE_PARTITIONS_FAILBACK_REQUEST,
+        PREPARE_PARTITIONS_FAILBACK_RESPONSE,
+        COMPLETE_FAILBACK_REQUEST,
+        COMPLETE_FAILBACK_RESPONSE,
+        REPLICA_EVENT
     }
 
     public abstract ApplicationMessageType getMessageType();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index ecc9494..6d5918f 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -36,6 +36,17 @@ public interface IRemoteRecoveryManager {
      * @throws IOException
      * @throws ACIDException
      */
-    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+    public void takeoverPartitons(Integer[] partitions) throws IOException, ACIDException;
 
+    /**
+     * Construct failback plan and requests LSM disk components from active remote replicas.
+     */
+    public void startFailbackProcess();
+
+    /**
+     * Requests the remaining LSM disk components files from active remote replicas.
+     *
+     * @throws IOException
+     */
+    public void completeFailbackProcess() throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index f13d300..e22fafc 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -23,14 +23,8 @@ import java.util.Set;
 public interface IReplicaResourcesManager {
 
     /**
-     * @param remoteNodes
-     * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
-     */
-    public long getMinRemoteLSN(Set<String> remoteNodes);
-
-    /**
      * @param partitions
      * @return the minimum LSN of all indexes that belong to {@code partitions}.
      */
-    public long getPartitionsMinLSN(Integer[] partitions);
+    public long getPartitionsMinLSN(Set<Integer> partitions);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index 76f8767..3fc2af0 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -54,12 +54,13 @@ public interface IReplicationManager extends IIOReplicationManager {
      *            Get logs that belong to those replicas.
      * @param fromLSN
      *            Low water mark for logs to be requested.
-     * @return The logs received that belong to the local node.
+     * @param recoveryLogsFile
+     *            a temporary file to store the logs required for recovery
      * @throws IOException
      * @throws ACIDException
      */
-    public ArrayList<ILogRecord> requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover,
-            long fromLSN) throws IOException, ACIDException;
+    public void requestReplicaLogs(String remoteReplicaId, Set<String> replicasDataToRecover, long fromLSN,
+            File recoveryLogsFile) throws IOException, ACIDException;
 
     /**
      * Requests LSM components files from a remote replica.
@@ -68,9 +69,12 @@ public interface IReplicationManager extends IIOReplicationManager {
      *            The replica id to send the request to.
      * @param replicasDataToRecover
      *            Get files that belong to those replicas.
+     * @param existingFiles
+     *            a list of already existing files on the requester
      * @throws IOException
      */
-    public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover) throws IOException;
+    public void requestReplicaFiles(String remoteReplicaId, Set<String> replicasDataToRecover,
+            Set<String> existingFiles) throws IOException;
 
     /**
      * Requests current maximum LSN from remote replicas.
@@ -83,13 +87,6 @@ public interface IReplicationManager extends IIOReplicationManager {
     public long getMaxRemoteLSN(Set<String> remoteReplicaIds) throws IOException;
 
     /**
-     * Sends the IP address of the local replica to all remote replicas.
-     *
-     * @throws IOException
-     */
-    public void broadcastNewIPAddress() throws IOException;
-
-    /**
      * @return The number of remote replicas that are in ACTIVE state.
      */
     public int getActiveReplicasCount();
@@ -146,5 +143,4 @@ public interface IReplicationManager extends IIOReplicationManager {
      * @throws IOException
      */
     public void requestFlushLaggingReplicaIndexes(long nonSharpCheckpointTargetLSN) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
new file mode 100644
index 0000000..0591644
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/NodeFailbackPlan.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
+import org.apache.asterix.common.messaging.PreparePartitionsFailbackRequestMessage;
+
+public class NodeFailbackPlan {
+
+    public enum FailbackPlanState {
+        /**
+         * Initial state while selecting the nodes that will participate
+         * in the node failback plan.
+         */
+        PREPARING,
+        /**
+         * Once a pending {@link PreparePartitionsFailbackRequestMessage} request is added,
+         * the state is changed from PREPARING to PENDING_PARTICIPANT_REPONSE to indicate
+         * a response is expected and need to wait for it.
+         */
+        PENDING_PARTICIPANT_REPONSE,
+        /**
+         * Upon receiving the last {@link PreparePartitionsFailbackResponseMessage} response,
+         * the state changes from PENDING_PARTICIPANT_REPONSE to PENDING_COMPLETION to indicate
+         * the need to send {@link CompleteFailbackRequestMessage} to the failing back node.
+         */
+        PENDING_COMPLETION,
+        /**
+         * if any of the participants fail or the failing back node itself fails during
+         * and of these states (PREPARING, PENDING_PARTICIPANT_REPONSE, PENDING_COMPLETION),
+         * the state is changed to FAILED.
+         */
+        FAILED,
+        /**
+         * if the state is FAILED, and all pending responses (if any) have been received,
+         * the state changes from FAILED to PENDING_ROLLBACK to indicate the need to revert
+         * the effects of this plan (if any).
+         */
+        PENDING_ROLLBACK
+    }
+
+    private static long planIdGenerator = 0;
+    private long planId;
+    private final String nodeId;
+    private final Set<String> participants;
+    private final Map<Integer, String> partition2nodeMap;
+    private String nodeToReleaseMetadataManager;
+    private int requestId;
+    private Map<Integer, PreparePartitionsFailbackRequestMessage> pendingRequests;
+    private FailbackPlanState state;
+
+    public static NodeFailbackPlan createPlan(String nodeId) {
+        return new NodeFailbackPlan(planIdGenerator++, nodeId);
+    }
+
+    private NodeFailbackPlan(long planId, String nodeId) {
+        this.planId = planId;
+        this.nodeId = nodeId;
+        participants = new HashSet<>();
+        partition2nodeMap = new HashMap<>();
+        pendingRequests = new HashMap<>();
+        state = FailbackPlanState.PREPARING;
+    }
+
+    public synchronized void addPartitionToFailback(int partitionId, String currentActiveNode) {
+        partition2nodeMap.put(partitionId, currentActiveNode);
+    }
+
+    public synchronized void addParticipant(String nodeId) {
+        participants.add(nodeId);
+    }
+
+    public synchronized void notifyNodeFailure(String failedNode) {
+        if (participants.contains(failedNode)) {
+            if (state == FailbackPlanState.PREPARING) {
+                state = FailbackPlanState.FAILED;
+            } else if (state == FailbackPlanState.PENDING_PARTICIPANT_REPONSE) {
+                /**
+                 * if there is any pending request from this failed node,
+                 * it should be marked as completed and the plan should be marked as failed
+                 */
+                Set<Integer> failedRequests = new HashSet<>();
+                for (PreparePartitionsFailbackRequestMessage request : pendingRequests.values()) {
+                    if (request.getNodeID().equals(failedNode)) {
+                        failedRequests.add(request.getRequestId());
+                    }
+                }
+
+                if (failedRequests.size() > 0) {
+                    state = FailbackPlanState.FAILED;
+                    for (Integer failedRequestId : failedRequests) {
+                        markRequestCompleted(failedRequestId);
+                    }
+                }
+            }
+        } else if (nodeId.equals(failedNode)) {
+            //if the failing back node is the failed node itself
+            state = FailbackPlanState.FAILED;
+            updateState();
+        }
+    }
+
+    public synchronized Set<Integer> getPartitionsToFailback() {
+        return new HashSet<>(partition2nodeMap.keySet());
+    }
+
+    public synchronized void addPendingRequest(PreparePartitionsFailbackRequestMessage msg) {
+        //if this is the first request
+        if (pendingRequests.size() == 0) {
+            state = FailbackPlanState.PENDING_PARTICIPANT_REPONSE;
+        }
+        pendingRequests.put(msg.getRequestId(), msg);
+    }
+
+    public synchronized void markRequestCompleted(int requestId) {
+        pendingRequests.remove(requestId);
+        updateState();
+    }
+
+    private void updateState() {
+        if (pendingRequests.size() == 0) {
+            switch (state) {
+                case PREPARING:
+                case FAILED:
+                    state = FailbackPlanState.PENDING_ROLLBACK;
+                    break;
+                case PENDING_PARTICIPANT_REPONSE:
+                    state = FailbackPlanState.PENDING_COMPLETION;
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    public synchronized Set<PreparePartitionsFailbackRequestMessage> getPlanFailbackRequests() {
+        Set<PreparePartitionsFailbackRequestMessage> node2Partitions = new HashSet<>();
+        /**
+         * for each participant, construct a request with the partitions
+         * that will be failed back or flushed.
+         */
+        for (String participant : participants) {
+            Set<Integer> partitionToPrepareForFailback = new HashSet<>();
+            for (Map.Entry<Integer, String> entry : partition2nodeMap.entrySet()) {
+                if (entry.getValue().equals(participant)) {
+                    partitionToPrepareForFailback.add(entry.getKey());
+                }
+            }
+            PreparePartitionsFailbackRequestMessage msg = new PreparePartitionsFailbackRequestMessage(planId,
+                    requestId++, participant, partitionToPrepareForFailback);
+            if (participant.equals(nodeToReleaseMetadataManager)) {
+                msg.setReleaseMetadataNode(true);
+            }
+            node2Partitions.add(msg);
+        }
+        return node2Partitions;
+    }
+
+    public synchronized CompleteFailbackRequestMessage getCompleteFailbackRequestMessage() {
+        return new CompleteFailbackRequestMessage(planId, requestId++, nodeId, getPartitionsToFailback());
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getPlanId() {
+        return planId;
+    }
+
+    public void setNodeToReleaseMetadataManager(String nodeToReleaseMetadataManager) {
+        this.nodeToReleaseMetadataManager = nodeToReleaseMetadataManager;
+    }
+
+    public synchronized FailbackPlanState getState() {
+        return state;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Plan ID: " + planId);
+        sb.append(" Failing back node: " + nodeId);
+        sb.append(" Participants: " + participants);
+        sb.append(" Partitions to Failback: " + partition2nodeMap.keySet());
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
index 0797a02..ae02ca9 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/ReplicaEvent.java
@@ -23,23 +23,14 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
-public class ReplicaEvent {
+import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 
-    /*
-     * FAIL: remote replica failed.
-     * JOIN: remote replica rejoined the cluster.
-     * SHUTDOWN: remote replica is shutting down normally
-     * */
-    public enum ReplicaEventType {
-        FAIL,
-        JOIN,
-        SHUTDOWN
-    }
+public class ReplicaEvent {
 
     Replica replica;
-    ReplicaEventType eventType;
+    ClusterEventType eventType;
 
-    public ReplicaEvent(Replica replica, ReplicaEventType eventType) {
+    public ReplicaEvent(Replica replica, ClusterEventType eventType) {
         this.replica = replica;
         this.eventType = eventType;
     }
@@ -52,11 +43,11 @@ public class ReplicaEvent {
         this.replica = replica;
     }
 
-    public ReplicaEventType getEventType() {
+    public ClusterEventType getEventType() {
         return eventType;
     }
 
-    public void setEventType(ReplicaEventType eventType) {
+    public void setEventType(ClusterEventType eventType) {
         this.eventType = eventType;
     }
 
@@ -68,7 +59,7 @@ public class ReplicaEvent {
 
     public static ReplicaEvent create(DataInput input) throws IOException {
         Replica replica = Replica.create(input);
-        ReplicaEventType eventType = ReplicaEventType.values()[input.readInt()];
+        ClusterEventType eventType = ClusterEventType.values()[input.readInt()];
         ReplicaEvent event = new ReplicaEvent(replica, eventType);
         return event;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index a88c985..7e27c54 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -31,10 +31,10 @@ public interface ILogRecord {
         OK
     }
 
-    public static final int JOB_TERMINATE_LOG_SIZE = 18; //JOB_COMMIT or ABORT log type
+    public static final int JOB_TERMINATE_LOG_SIZE = 14; //JOB_COMMIT or ABORT log type
     public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
     public static final int UPDATE_LOG_BASE_SIZE = 59;
-    public static final int FLUSH_LOG_SIZE = 22;
+    public static final int FLUSH_LOG_SIZE = 18;
 
     public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
 
@@ -112,11 +112,9 @@ public interface ILogRecord {
 
     public String getNodeId();
 
-    public void setNodeId(String nodeId);
-
     public int writeRemoteRecoveryLog(ByteBuffer buffer);
 
-    public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId);
+    public RECORD_STATUS readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog);
 
     public void setReplicationThread(IReplicationThread replicationThread);
 
@@ -130,4 +128,16 @@ public interface ILogRecord {
 
     public ByteBuffer getSerializedLog();
 
+    public void setNodeId(String nodeId);
+
+    public int getResourcePartition();
+
+    public void setResourcePartition(int resourcePartition);
+
+    public void setReplicated(boolean replicated);
+
+    /**
+     * @return a flag indicating whether the log record should be sent to remote replicas
+     */
+    public boolean isReplicated();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index cb3f8c3..a3115e7 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -18,8 +18,9 @@
  */
 package org.apache.asterix.common.transactions;
 
+import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -82,31 +83,25 @@ public interface IRecoveryManager {
 
     /**
      * Makes a system checkpoint.
-     * @param isSharpCheckpoint a flag indicating whether to perform a sharp or non-sharp checkpoint.
-     * @param nonSharpCheckpointTargetLSN if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
+     *
+     * @param isSharpCheckpoint
+     *            a flag indicating whether to perform a sharp or non-sharp checkpoint.
+     * @param nonSharpCheckpointTargetLSN
+     *            if a non-sharp checkpoint to be performed, what is the minimum LSN it should target.
      * @return the LSN at which the checkpoint was performed.
      * @throws ACIDException
      * @throws HyracksDataException
      */
-    public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN) throws ACIDException, HyracksDataException;
+    public long checkpoint(boolean isSharpCheckpoint, long nonSharpCheckpointTargetLSN)
+            throws ACIDException, HyracksDataException;
 
     /**
-     * Performs recovery based on the passed logs
-     * @param remoteLogs the remote logs to be replayed
-     * @throws HyracksDataException
-     * @throws ACIDException
-     */
-    public void replayRemoteLogs(ArrayList<ILogRecord> remoteLogs) throws HyracksDataException, ACIDException;
-
-    /**
-     *
      * @return min first LSN of the open indexes (including remote indexes if replication is enabled)
      * @throws HyracksDataException
      */
     public long getMinFirstLSN() throws HyracksDataException;
 
     /**
-     *
      * @return min first LSN of the open indexes
      * @throws HyracksDataException
      */
@@ -114,11 +109,29 @@ public interface IRecoveryManager {
 
     /**
      * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+     *
      * @param partitions
      * @param lowWaterMarkLSN
      * @param failedNode
      * @throws IOException
      * @throws ACIDException
      */
-    public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
+    public void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN)
+            throws IOException, ACIDException;
+
+    /**
+     * Creates a temporary file to be used during recovery
+     *
+     * @param jobId
+     * @param fileName
+     * @return A file to the created temporary file
+     * @throws IOException
+     *             if the file for the specified {@code jobId} with the {@code fileName} already exists
+     */
+    public File createJobRecoveryFile(int jobId, String fileName) throws IOException;
+
+    /**
+     * Deletes all temporary recovery files
+     */
+    public void deleteRecoveryTemporaryFiles();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index d4a96a5..d09f6ca 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -20,8 +20,6 @@ package org.apache.asterix.common.transactions;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.CRC32;
 
@@ -34,15 +32,14 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
 /*
  * == LogRecordFormat ==
  * ---------------------------
- * [Header1] (10 bytes + NodeId Length) : for all log types
+ * [Header1] (6 bytes) : for all log types
  * LogSource(1)
  * LogType(1)
  * JobId(4)
- * NodeIdLength(4)
- * NodeId(?)
  * ---------------------------
- * [Header2] (12 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types
+ * [Header2] (16 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types
  * DatasetId(4) //stored in dataset_dataset in Metadata Node
+ * ResourcePartition(4)
  * PKHashValue(4)
  * PKValueSize(4)
  * PKValue(PKValueSize)
@@ -52,7 +49,7 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
  * ResourceId(8) //stored in .metadata of the corresponding index in NC node
  * LogRecordSize(4)
  * ---------------------------
- * [Body] (Variable size) : only for update log type
+ * [Body] (9 bytes + NewValueSize) : only for update log type
  * FieldCnt(4)
  * NewOp(1)
  * NewValueSize(4)
@@ -62,19 +59,18 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
  * Checksum(8)
  * ---------------------------
  * = LogSize =
- * 1) JOB_COMMIT_LOG_SIZE: 13 bytes (5 + 8)
- * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: 25 + PKSize (5 + 12 + PKSize + 8)
- *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 25
- * 3) UPDATE: 54 + PKValueSize + NewValueSize (5 + 12 + PKValueSize + 20 + 9 + NewValueSize + 8)
- * 4) FLUSH: 5 + 8 + DatasetId(4) (In case of serialize: + (8 bytes for LSN) + (4 bytes for number of flushed indexes)
+ * 1) JOB_COMMIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
+ * 2) ENTITY_COMMIT || UPSERT_ENTITY_COMMIT: (Header1(6) + Header2(16) + Tail(8)) + PKValueSize
+ *    --> ENTITY_COMMIT_LOG_BASE_SIZE = 30
+ * 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
+ *    --> UPDATE_LOG_BASE_SIZE = 59
+ * 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
  */
 
 public class LogRecord implements ILogRecord {
 
     // ------------- fields in a log record (begin) ------------//
     private byte logSource;
-    private String nodeId;
-    private int nodeIdLength;
     private byte logType;
     private int jobId;
     private int datasetId;
@@ -83,6 +79,7 @@ public class LogRecord implements ILogRecord {
     private ITupleReference PKValue;
     private long prevLSN;
     private long resourceId;
+    private int resourcePartition;
     private int logSize;
     private int fieldCnt;
     private byte newOp;
@@ -103,9 +100,13 @@ public class LogRecord implements ILogRecord {
     private PrimaryIndexOperationTracker opTracker;
     private IReplicationThread replicationThread;
     private ByteBuffer serializedLog;
-    private final Map<String, byte[]> nodeIdsMap;
-    // this field is used for serialized flush logs only to indicate how many indexes were flushed using its LSN.
+    /**
+     * The fields (numOfFlushedIndexes and nodeId) are used for serialized flush logs only
+     * to indicate the source of the log and how many indexes were flushed using its LSN.
+     */
     private int numOfFlushedIndexes;
+    private String nodeId;
+    private boolean replicated = false;
 
     public LogRecord() {
         isFlushed = new AtomicBoolean(false);
@@ -113,16 +114,15 @@ public class LogRecord implements ILogRecord {
         readPKValue = new PrimaryKeyTupleReference();
         readNewValue = (SimpleTupleReference) tupleWriter.createTupleReference();
         checksumGen = new CRC32();
-        this.nodeIdsMap = new HashMap<String, byte[]>();
         logSource = LogSource.LOCAL;
     }
 
     private final static int LOG_SOURCE_LEN = Byte.BYTES;
-    private final static int NODE_ID_STRING_LENGTH = Integer.BYTES;
     private final static int TYPE_LEN = Byte.BYTES;
     public final static int PKHASH_LEN = Integer.BYTES;
     public final static int PKSZ_LEN = Integer.BYTES;
     private final static int PRVLSN_LEN = Long.BYTES;
+    private final static int RS_PARTITION_LEN = Integer.BYTES;
     private final static int RSID_LEN = Long.BYTES;
     private final static int LOGRCD_SZ_LEN = Integer.BYTES;
     private final static int FLDCNT_LEN = Integer.BYTES;
@@ -130,27 +130,19 @@ public class LogRecord implements ILogRecord {
     private final static int NEWVALSZ_LEN = Integer.BYTES;
     private final static int CHKSUM_LEN = Long.BYTES;
 
-    private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES + NODE_ID_STRING_LENGTH;
-    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+    private final static int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+    private final static int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
+            + PKSZ_LEN;
     private final static int UPDATE_LSN_HEADER = PRVLSN_LEN + RSID_LEN + LOGRCD_SZ_LEN;
     private final static int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+    private final static int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
 
     private void writeLogRecordCommonFields(ByteBuffer buffer) {
         buffer.put(logSource);
         buffer.put(logType);
         buffer.putInt(jobId);
-        if (nodeIdsMap.containsKey(nodeId)) {
-            buffer.put(nodeIdsMap.get(nodeId));
-        } else {
-            // byte array for node id length and string
-            byte[] bytes = new byte[(Integer.SIZE / 8) + nodeId.length()];
-            buffer.putInt(nodeId.length());
-            buffer.put(nodeId.getBytes(java.nio.charset.StandardCharsets.UTF_8));
-            buffer.position(buffer.position() - bytes.length);
-            buffer.get(bytes, 0, bytes.length);
-            nodeIdsMap.put(nodeId, bytes);
-        }
         if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT) {
+            buffer.putInt(resourcePartition);
             buffer.putInt(datasetId);
             buffer.putInt(PKHashValue);
             if (PKValueSize <= 0) {
@@ -168,7 +160,6 @@ public class LogRecord implements ILogRecord {
             buffer.putInt(newValueSize);
             writeTuple(buffer, newValue, newValueSize);
         }
-
         if (logType == LogType.FLUSH) {
             buffer.putInt(datasetId);
         }
@@ -188,10 +179,9 @@ public class LogRecord implements ILogRecord {
         int beginOffset = buffer.position();
         writeLogRecordCommonFields(buffer);
 
-        if (logSource == LogSource.LOCAL) {
-            // copy the serialized log to send it to replicas
-            int serializedLogSize = getSerializedLogSize(logType, logSize);
-
+        if (replicated) {
+            //copy the serialized log to send it to replicas
+            int serializedLogSize = getSerializedLogSize();
             if (serializedLog == null || serializedLog.capacity() < serializedLogSize) {
                 serializedLog = ByteBuffer.allocate(serializedLogSize);
             } else {
@@ -207,6 +197,8 @@ public class LogRecord implements ILogRecord {
             if (logType == LogType.FLUSH) {
                 serializedLog.putLong(appendLSN);
                 serializedLog.putInt(numOfFlushedIndexes);
+                serializedLog.putInt(nodeId.length());
+                serializedLog.put(nodeId.getBytes());
             }
             serializedLog.flip();
             buffer.position(currentPosition);
@@ -244,15 +236,8 @@ public class LogRecord implements ILogRecord {
     public RECORD_STATUS readLogRecord(ByteBuffer buffer) {
         int beginOffset = buffer.position();
 
-        // read header
-        RECORD_STATUS status = readLogHeader(buffer);
-        if (status != RECORD_STATUS.OK) {
-            buffer.position(beginOffset);
-            return status;
-        }
-
-        // read body
-        status = readLogBody(buffer, false);
+        //read common fields
+        RECORD_STATUS status = readLogCommonFields(buffer);
         if (status != RECORD_STATUS.OK) {
             buffer.position(beginOffset);
             return status;
@@ -271,38 +256,25 @@ public class LogRecord implements ILogRecord {
         return RECORD_STATUS.OK;
     }
 
-    private RECORD_STATUS readLogHeader(ByteBuffer buffer) {
-        // first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
+    private RECORD_STATUS readLogCommonFields(ByteBuffer buffer) {
+        //first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
         if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
             return RECORD_STATUS.TRUNCATED;
         }
         logSource = buffer.get();
         logType = buffer.get();
         jobId = buffer.getInt();
-        nodeIdLength = buffer.getInt();
-        // attempt to read node id
-        if (buffer.remaining() < nodeIdLength) {
-            return RECORD_STATUS.TRUNCATED;
-        }
-        // read node id string
-        nodeId = new String(buffer.array(), buffer.position() + buffer.arrayOffset(), nodeIdLength,
-                java.nio.charset.StandardCharsets.UTF_8);
-        // skip node id string bytes
-        buffer.position(buffer.position() + nodeIdLength);
 
-        return RECORD_STATUS.OK;
-    }
-
-    private RECORD_STATUS readLogBody(ByteBuffer buffer, boolean allocateTupleBuffer) {
         if (logType != LogType.FLUSH) {
             if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
                 datasetId = -1;
                 PKHashValue = -1;
             } else {
-                // attempt to read in the dsid, PK hash and PK length
+                //attempt to read in the resourcePartition, dsid, PK hash and PK length
                 if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) {
                     return RECORD_STATUS.TRUNCATED;
                 }
+                resourcePartition = buffer.getInt();
                 datasetId = buffer.getInt();
                 PKHashValue = buffer.getInt();
                 PKValueSize = buffer.getInt();
@@ -330,16 +302,7 @@ public class LogRecord implements ILogRecord {
                 if (buffer.remaining() < newValueSize) {
                     return RECORD_STATUS.TRUNCATED;
                 }
-                if (!allocateTupleBuffer) {
-                    newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
-                } else {
-                    ByteBuffer tupleBuffer = ByteBuffer.allocate(newValueSize);
-                    tupleBuffer.put(buffer.array(), buffer.position(), newValueSize);
-                    tupleBuffer.flip();
-                    newValue = readTuple(tupleBuffer, readNewValue, fieldCnt, newValueSize);
-                    // skip tuple bytes
-                    buffer.position(buffer.position() + newValueSize);
-                }
+                newValue = readTuple(buffer, readNewValue, fieldCnt, newValueSize);
             } else {
                 computeAndSetLogSize();
             }
@@ -356,25 +319,47 @@ public class LogRecord implements ILogRecord {
     }
 
     @Override
-    public void readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog, String localNodeId) {
-        readLogHeader(buffer);
-        if (!remoteRecoveryLog || !nodeId.equals(localNodeId)) {
-            readLogBody(buffer, false);
-        } else {
-            // need to allocate buffer for tuple since the logs will be kept in memory to use during remote recovery
-            // TODO when this is redesigned to spill remote recovery logs to disk, this will not be needed
-            readLogBody(buffer, true);
+    public RECORD_STATUS readRemoteLog(ByteBuffer buffer, boolean remoteRecoveryLog) {
+        int beginOffset = buffer.position();
+
+        //read common fields
+        RECORD_STATUS status = readLogCommonFields(buffer);
+        if (status != RECORD_STATUS.OK) {
+            buffer.position(beginOffset);
+            return status;
         }
 
         if (logType == LogType.FLUSH) {
-            LSN = buffer.getLong();
-            numOfFlushedIndexes = buffer.getInt();
+            if (buffer.remaining() >= REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN) {
+                LSN = buffer.getLong();
+                numOfFlushedIndexes = buffer.getInt();
+                //read serialized node id
+                int nodeIdLength = buffer.getInt();
+                if (buffer.remaining() >= nodeIdLength) {
+                    byte[] nodeIdBytes = new byte[nodeIdLength];
+                    buffer.get(nodeIdBytes);
+                    nodeId = new String(nodeIdBytes);
+                } else {
+                    buffer.position(beginOffset);
+                    return RECORD_STATUS.TRUNCATED;
+                }
+            } else {
+                buffer.position(beginOffset);
+                return RECORD_STATUS.TRUNCATED;
+            }
         }
 
-        // remote recovery logs need to have the LSN to check which should be replayed
-        if (remoteRecoveryLog && nodeId.equals(localNodeId)) {
-            LSN = buffer.getLong();
+        //remote recovery logs need to have the LSN to check which should be replayed
+        if (remoteRecoveryLog) {
+            if (buffer.remaining() >= Long.BYTES) {
+                LSN = buffer.getLong();
+            } else {
+                buffer.position(beginOffset);
+                return RECORD_STATUS.TRUNCATED;
+            }
         }
+
+        return RECORD_STATUS.OK;
     }
 
     private ITupleReference readPKValue(ByteBuffer buffer) {
@@ -430,21 +415,19 @@ public class LogRecord implements ILogRecord {
             default:
                 throw new IllegalStateException("Unsupported Log Type");
         }
-
-        logSize += nodeIdLength;
     }
 
     @Override
     public String getLogRecordForDisplay() {
         StringBuilder builder = new StringBuilder();
         builder.append(" Source : ").append(LogSource.toString(logSource));
-        builder.append(" NodeID : ").append(nodeId);
         builder.append(" LSN : ").append(LSN);
         builder.append(" LogType : ").append(LogType.toString(logType));
         builder.append(" LogSize : ").append(logSize);
         builder.append(" JobId : ").append(jobId);
         if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
             builder.append(" DatasetId : ").append(datasetId);
+            builder.append(" ResourcePartition : ").append(resourcePartition);
             builder.append(" PKHashValue : ").append(PKHashValue);
             builder.append(" PKFieldCnt : ").append(PKFieldCnt);
             builder.append(" PKSize: ").append(PKValueSize);
@@ -460,11 +443,8 @@ public class LogRecord implements ILogRecord {
     public int writeRemoteRecoveryLog(ByteBuffer buffer) {
         int bufferBegin = buffer.position();
         writeLogRecordCommonFields(buffer);
-        if (logType == LogType.FLUSH) {
-            buffer.putLong(LSN);
-            buffer.putInt(numOfFlushedIndexes);
-        }
-        // LSN must be included in all remote recovery logs (not only FLUSH)
+        //FLUSH logs should not included in remote recovery
+        //LSN must be included in all remote recovery logs
         buffer.putLong(LSN);
         return buffer.position() - bufferBegin;
     }
@@ -560,21 +540,21 @@ public class LogRecord implements ILogRecord {
 
     @Override
     public int getSerializedLogSize() {
-        return getSerializedLogSize(logType, logSize);
-    }
-
-    private static int getSerializedLogSize(Byte logType, int logSize) {
+        int serilizedSize = logSize;
         if (logType == LogType.FLUSH) {
-            // LSN
-            logSize += (Long.SIZE / 8);
-            // num of indexes
-            logSize += (Integer.SIZE / 8);
+            //LSN
+            serilizedSize += Long.BYTES;
+            //num of indexes
+            serilizedSize += Integer.BYTES;
+            //serialized node id String
+            serilizedSize += Integer.BYTES + nodeId.length();
         }
-
-        // checksum not included in serialized version
-        logSize -= CHKSUM_LEN;
-
-        return logSize;
+        if (logSource == LogSource.REMOTE_RECOVERY) {
+            //for LSN;
+            serilizedSize += Long.BYTES;
+        }
+        serilizedSize -= CHKSUM_LEN;
+        return serilizedSize;
     }
 
     @Override
@@ -675,7 +655,6 @@ public class LogRecord implements ILogRecord {
     @Override
     public void setNodeId(String nodeId) {
         this.nodeId = nodeId;
-        this.nodeIdLength = nodeId.length();
     }
 
     public IReplicationThread getReplicationThread() {
@@ -713,4 +692,23 @@ public class LogRecord implements ILogRecord {
         this.opTracker = opTracker;
     }
 
-}
+    @Override
+    public int getResourcePartition() {
+        return resourcePartition;
+    }
+
+    @Override
+    public void setResourcePartition(int resourcePartition) {
+        this.resourcePartition = resourcePartition;
+    }
+
+    @Override
+    public void setReplicated(boolean replicate) {
+        this.replicated = replicate;
+    }
+
+    @Override
+    public boolean isReplicated() {
+        return replicated;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
new file mode 100644
index 0000000..b75d16c
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/ServletUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+public class ServletUtil {
+
+    public enum Servlets {
+        AQL("/aql"),
+        AQL_QUERY("/query"),
+        AQL_UPDATE("/update"),
+        AQL_DDL("/ddl"),
+        SQLPP("/sqlpp"),
+        SQLPP_QUERY("/query/sqlpp"),
+        SQLPP_UPDATE("/update/sqlpp"),
+        SQLPP_DDL("/ddl/sqlpp"),
+        QUERY_STATUS("/query/status"),
+        QUERY_RESULT("/query/result"),
+        QUERY_SERVICE("/query/service"),
+        CONNECTOR("/connector"),
+        SHUTDOWN("/admin/shutdown"),
+        VERSION("/admin/version"),
+        CLUSTER_STATE("/admin/cluster");
+
+        private final String path;
+
+        private Servlets(String path) {
+            this.path = path;
+        }
+
+        public String getPath() {
+            return path;
+        }
+    }
+
+    private ServletUtil() {
+        throw new AssertionError("No objects of this class should be created.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 48e42bd..5b4035c 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -66,4 +66,8 @@ public class StoragePathUtil {
     private static String prepareFullIndexName(String datasetName, String idxName) {
         return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
     }
+
+    public static int getPartitonNumFromName(String name) {
+        return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index 97674e6..1d5b15e 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -28,61 +28,39 @@ public class TransactionUtil {
 
     public static void formJobTerminateLogRecord(ITransactionContext txnCtx, LogRecord logRecord, boolean isCommit) {
         logRecord.setTxnCtx(txnCtx);
-        TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getJobId().getId(), isCommit,
-                logRecord.getNodeId());
+        TransactionUtil.formJobTerminateLogRecord(logRecord, txnCtx.getJobId().getId(), isCommit);
     }
 
-    public static void formJobTerminateLogRecord(LogRecord logRecord, int jobId, boolean isCommit, String nodeId) {
+    public static void formJobTerminateLogRecord(LogRecord logRecord, int jobId, boolean isCommit) {
         logRecord.setLogType(isCommit ? LogType.JOB_COMMIT : LogType.ABORT);
         logRecord.setDatasetId(-1);
         logRecord.setPKHashValue(-1);
         logRecord.setJobId(jobId);
-        logRecord.setNodeId(nodeId);
         logRecord.computeAndSetLogSize();
     }
 
     public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker,
-            int numOfFlushedIndexes) {
-        formFlushLogRecord(logRecord, datasetId, opTracker, null, numOfFlushedIndexes);
-    }
-
-    public static void formFlushLogRecord(LogRecord logRecord, int datasetId, PrimaryIndexOperationTracker opTracker,
             String nodeId, int numberOfIndexes) {
         logRecord.setLogType(LogType.FLUSH);
         logRecord.setJobId(-1);
         logRecord.setDatasetId(datasetId);
         logRecord.setOpTracker(opTracker);
         logRecord.setNumOfFlushedIndexes(numberOfIndexes);
-        if (nodeId != null) {
-            logRecord.setNodeId(nodeId);
-        }
+        logRecord.setNodeId(nodeId);
         logRecord.computeAndSetLogSize();
     }
 
     public static void formEntityCommitLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
-            int PKHashValue, ITupleReference PKValue, int[] PKFields) {
-        logRecord.setTxnCtx(txnCtx);
-        logRecord.setLogType(LogType.ENTITY_COMMIT);
-        logRecord.setJobId(txnCtx.getJobId().getId());
-        logRecord.setDatasetId(datasetId);
-        logRecord.setPKHashValue(PKHashValue);
-        logRecord.setPKFieldCnt(PKFields.length);
-        logRecord.setPKValue(PKValue);
-        logRecord.setPKFields(PKFields);
-        logRecord.computeAndSetPKValueSize();
-        logRecord.computeAndSetLogSize();
-    }
-
-    public static void formEntityUpsertCommitLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
-            int PKHashValue, ITupleReference PKValue, int[] PKFields) {
+            int PKHashValue, ITupleReference PKValue, int[] PKFields, int resourcePartition, byte entityCommitType) {
         logRecord.setTxnCtx(txnCtx);
-        logRecord.setLogType(LogType.UPSERT_ENTITY_COMMIT);
+        logRecord.setLogType(entityCommitType);
         logRecord.setJobId(txnCtx.getJobId().getId());
         logRecord.setDatasetId(datasetId);
         logRecord.setPKHashValue(PKHashValue);
         logRecord.setPKFieldCnt(PKFields.length);
         logRecord.setPKValue(PKValue);
         logRecord.setPKFields(PKFields);
+        logRecord.setResourcePartition(resourcePartition);
         logRecord.computeAndSetPKValueSize();
         logRecord.computeAndSetLogSize();
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index f54db63..29e08b5 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -24,7 +24,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
@@ -39,6 +38,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.utils.ServletUtil.Servlets;
 import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat;
 import org.apache.asterix.testframework.context.TestFileContext;
@@ -56,7 +56,6 @@ import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.StringRequestEntity;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.json.JSONObject;
 
 public class TestExecutor {
@@ -262,6 +261,17 @@ public class TestExecutor {
         return method.getResponseBodyAsStream();
     }
 
+    public InputStream executeClusterStateQuery(OutputFormat fmt, String url) throws Exception {
+        HttpMethodBase method = new GetMethod(url);
+
+        //Set accepted output response type
+        method.setRequestHeader("Accept", fmt.mimeType());
+        // Provide custom retry handler is necessary
+        method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false));
+        executeHttpMethod(method);
+        return method.getResponseBodyAsStream();
+    }
+
     // To execute Update statements
     // Insert and Delete statements are executed here
     public void executeUpdate(String str, String url) throws Exception {
@@ -301,7 +311,7 @@ public class TestExecutor {
     }
 
     private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception {
-        final String url = "http://" + host + ":" + port + "/query/result";
+        final String url = "http://" + host + ":" + port + Servlets.QUERY_RESULT.getPath();
 
         // Create a method instance.
         GetMethod method = new GetMethod(url);
@@ -372,6 +382,14 @@ public class TestExecutor {
         return IOUtils.toString(input, StandardCharsets.UTF_8.name());
     }
 
+    private static String executeVagrantManagix(ProcessBuilder pb, String command) throws Exception {
+        pb.command("vagrant", "ssh", "cc", "--", pb.environment().get("MANAGIX_HOME") + command);
+        Process p = pb.start();
+        p.waitFor();
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+
     private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
         String targetWord = "queries" + File.separator;
         int targetWordSize = targetWord.lastIndexOf(File.separator);
@@ -439,9 +457,9 @@ public class TestExecutor {
                     switch (ctx.getType()) {
                         case "ddl":
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
                             } else {
-                                executeDDL(statement, "http://" + host + ":" + port + "/ddl/sqlpp");
+                                executeDDL(statement, "http://" + host + ":" + port + Servlets.SQLPP_DDL.getPath());
                             }
                             break;
                         case "update":
@@ -451,9 +469,10 @@ public class TestExecutor {
                                         "127.0.0.1://../../../../../../asterix-app/");
                             }
                             if (ctx.getFile().getName().endsWith("aql")) {
-                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
                             } else {
-                                executeUpdate(statement, "http://" + host + ":" + port + "/update/sqlpp");
+                                executeUpdate(statement,
+                                        "http://" + host + ":" + port + Servlets.SQLPP_UPDATE.getPath());
                             }
                             break;
                         case "query":
@@ -471,24 +490,26 @@ public class TestExecutor {
                             if (ctx.getFile().getName().endsWith("aql")) {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
                                     resultStream = executeQuery(statement, fmt,
-                                            "http://" + host + ":" + port + "/query", cUnit.getParameter());
+                                            "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(),
+                                            cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://" + host + ":" + port + "/aql");
+                                            "http://" + host + ":" + port + Servlets.AQL.getPath());
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://" + host + ":" + port + "/aql");
+                                            "http://" + host + ":" + port + Servlets.AQL.getPath());
                                 }
                             } else {
                                 if (ctx.getType().equalsIgnoreCase("query")) {
                                     resultStream = executeQuery(statement, fmt,
-                                            "http://" + host + ":" + port + "/query/sqlpp", cUnit.getParameter());
+                                            "http://" + host + ":" + port + Servlets.SQLPP_QUERY.getPath(),
+                                            cUnit.getParameter());
                                 } else if (ctx.getType().equalsIgnoreCase("async")) {
                                     resultStream = executeAnyAQLAsync(statement, false, fmt,
-                                            "http://" + host + ":" + port + "/sqlpp");
+                                            "http://" + host + ":" + port + Servlets.SQLPP.getPath());
                                 } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) {
                                     resultStream = executeAnyAQLAsync(statement, true, fmt,
-                                            "http://" + host + ":" + port + "/sqlpp");
+                                            "http://" + host + ":" + port + Servlets.SQLPP.getPath());
                                 }
                             }
 
@@ -505,9 +526,6 @@ public class TestExecutor {
 
                             runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
                                     actualResultFile);
-                            LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
-                                    + " PASSED ");
-
                             queryCount++;
                             break;
                         case "mgx":
@@ -515,7 +533,7 @@ public class TestExecutor {
                             break;
                         case "txnqbc": //qbc represents query before crash
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
                             qbcFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qbc.adm");
@@ -524,20 +542,17 @@ public class TestExecutor {
                             break;
                         case "txnqar": //qar represents query after recovery
                             resultStream = executeQuery(statement, OutputFormat.forCompilationUnit(cUnit),
-                                    "http://" + host + ":" + port + "/query", cUnit.getParameter());
+                                    "http://" + host + ":" + port + Servlets.AQL_QUERY.getPath(), cUnit.getParameter());
                             qarFile = new File(actualPath + File.separator
                                     + testCaseCtx.getTestCase().getFilePath().replace(File.separator, "_") + "_"
                                     + cUnit.getName() + "_qar.adm");
                             qarFile.getParentFile().mkdirs();
                             writeOutputToFile(qarFile, resultStream);
                             runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), qbcFile, qarFile);
-
-                            LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
-                                    + " PASSED ");
                             break;
                         case "txneu": //eu represents erroneous update
                             try {
-                                executeUpdate(statement, "http://" + host + ":" + port + "/update");
+                                executeUpdate(statement, "http://" + host + ":" + port + Servlets.AQL_UPDATE.getPath());
                             } catch (Exception e) {
                                 //An exception is expected.
                                 failed = true;
@@ -565,7 +580,7 @@ public class TestExecutor {
                             break;
                         case "errddl": // a ddlquery that expects error
                             try {
-                                executeDDL(statement, "http://" + host + ":" + port + "/ddl");
+                                executeDDL(statement, "http://" + host + ":" + port + Servlets.AQL_DDL.getPath());
                             } catch (Exception e) {
                                 // expected error happens
                                 failed = true;
@@ -576,7 +591,7 @@ public class TestExecutor {
                             }
                             System.err.println("...but that was expected.");
                             break;
-                        case "vagrant_script":
+                        case "vscript": //a script that will be executed on a vagrant virtual node
                             try {
                                 String[] command = statement.trim().split(" ");
                                 if (command.length != 2) {
@@ -592,6 +607,32 @@ public class TestExecutor {
                                 throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
                             }
                             break;
+                        case "vmgx": //a managix command that will be executed on vagrant cc node
+                            try {
+                                String output = executeVagrantManagix(pb, statement);
+                                if (output.contains("ERROR")) {
+                                    throw new Exception(output);
+                                }
+                            } catch (Exception e) {
+                                throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+                            }
+                            break;
+                        case "cstate": //cluster state query
+                            try {
+                                fmt = OutputFormat.forCompilationUnit(cUnit);
+                                resultStream = executeClusterStateQuery(fmt,
+                                        "http://" + host + ":" + port + Servlets.CLUSTER_STATE.getPath());
+                                expectedResultFile = expectedResultFileCtxs.get(queryCount).getFile();
+                                actualResultFile = testCaseCtx.getActualResultFile(cUnit, new File(actualPath));
+                                actualResultFile.getParentFile().mkdirs();
+                                writeOutputToFile(actualResultFile, resultStream);
+                                runScriptAndCompareWithResult(testFile, new PrintWriter(System.err), expectedResultFile,
+                                        actualResultFile);
+                                queryCount++;
+                            } catch (Exception e) {
+                                throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+                            }
+                            break;
                         default:
                             throw new IllegalArgumentException("No statements of type " + ctx.getType());
                     }
@@ -626,6 +667,9 @@ public class TestExecutor {
                                 "Test \"" + cUnit.getName() + "\" FAILED!\nExpected error was not thrown...");
                         e.printStackTrace();
                         throw e;
+                    } else if (numOfFiles == testFileCtxs.size()) {
+                        LOGGER.info("[TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName()
+                                + " PASSED ");
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
index 86c15ae..62ca9bf 100644
--- a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -58,7 +58,9 @@ public class ReplicationIT {
     private static ProcessBuilder pb;
     private static Map<String, String> env;
     private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS, CLUSTER_CC_API_PORT);
-    private static String SCRIPT_HOME;
+    private static String SCRIPT_HOME = "/vagrant/scripts/";
+    private static String MANAGIX_HOME = "/tmp/asterix/bin/managix ";
+    private static final String INSTANCE_NAME = "asterix";
     protected TestCaseContext tcCtx;
 
     public ReplicationIT(TestCaseContext tcCtx) {
@@ -107,10 +109,10 @@ public class ReplicationIT {
 
         remoteInvoke("cp -r /vagrant/" + managixFolderName + " /tmp/asterix");
 
-        SCRIPT_HOME = "/vagrant/scripts/";
         pb = new ProcessBuilder();
         env = pb.environment();
         env.put("SCRIPT_HOME", SCRIPT_HOME);
+        env.put("MANAGIX_HOME", MANAGIX_HOME);
         File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
         pb.directory(cwd);
         pb.redirectErrorStream(true);
@@ -141,13 +143,13 @@ public class ReplicationIT {
     @Before
     public void beforeTest() throws Exception {
         //create instance
-        managixInvoke("create -n vagrant-ssh -c /vagrant/cluster_with_replication.xml").getInputStream();
+        managixInvoke("create -n " + INSTANCE_NAME + " -c /vagrant/cluster_with_replication.xml").getInputStream();
     }
 
     @After
     public void afterTest() throws Exception {
         //stop instance
-        managixInvoke("stop -n vagrant-ssh");
+        managixInvoke("stop -n " + INSTANCE_NAME);
 
         //verify that all processes have been stopped
         String killProcesses = "kill_cc_and_nc.sh";
@@ -162,7 +164,7 @@ public class ReplicationIT {
         executeVagrantScript("nc2", deleteStorage);
 
         //delete instance
-        managixInvoke("delete -n vagrant-ssh");
+        managixInvoke("delete -n " + INSTANCE_NAME);
     }
 
     @Test
@@ -244,7 +246,7 @@ public class ReplicationIT {
     }
 
     private static Process managixInvoke(String cmd) throws Exception {
-        return remoteInvoke("/tmp/asterix/bin/managix " + cmd);
+        return remoteInvoke(MANAGIX_HOME + cmd);
     }
 
     private static String executeVagrantScript(String node, String scriptName) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql
new file mode 100644
index 0000000..2c49a01
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.1.ddl.aql
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : node_failback.aql
+ * Description     : Make sure node failback completes as expected.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+                     kill one node and wait until the failover complete, query cluster state,
+                     query data, insert new data, start the killed node and wait for failback,
+                     query cluster state, query data.
+ * Expected Result : Success
+ * Date            : February 3 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+    organization-name: string,
+    start-date: date,
+    end-date: date?
+}
+
+create type FacebookUserType as closed {
+    id: int,
+    alias: string,
+    name: string,
+    user-since: datetime,
+    friend-ids: {{ int32 }},
+    employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets  ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
+
+create dataset FacebookUsersInMemory(FacebookUserType)
+primary key id;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql
new file mode 100644
index 0000000..bd01d99
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.10.cstate.aql
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : node_failback.aql
+ * Description     : Make sure node failback completes as expected.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+                     kill one node and wait until the failover complete, query cluster state,
+                     query data, insert new data, start the killed node and wait for failback,
+                     query cluster state, query data.
+ * Expected Result : Success
+ * Date            : February 3 2016
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql
new file mode 100644
index 0000000..b09c3d3
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.11.query.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : node_failback.aql
+ * Description     : Make sure node failback completes as expected.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+                     kill one node and wait until the failover complete, query cluster state,
+                     query data, insert new data, start the killed node and wait for failback,
+                     query cluster state, query data.
+ * Expected Result : Success
+ * Date            : February 3 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
new file mode 100644
index 0000000..47f5975
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.2.update.aql
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : node_failback.aql
+ * Description     : Make sure node failback completes as expected.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset,
+                     kill one node and wait until the failover complete, query cluster state,
+                     query data, insert new data, start the killed node and wait for failback,
+                     query cluster state, query data.
+ * Expected Result : Success
+ * Date            : February 3 2016
+ */
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="asterix_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+
+insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql
new file mode 100644
index 0000000..5eec164
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.3.vscript.aql
@@ -0,0 +1 @@
+nc1 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failback/node_failback/node_failback.4.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file