You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/01/27 00:31:24 UTC

[1/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance

Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master 5b068d2d0 -> 8fc8bf8b5


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
deleted file mode 100644
index 5e0c9b0..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixLSMIndexFileProperties.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
-
-public class AsterixLSMIndexFileProperties {
-
-    private String fileName;
-    private long fileSize;
-    private String nodeId;
-    private String dataverse;
-    private int ioDeviceNum;
-    private String idxName;
-    private boolean lsmComponentFile;
-    private String filePath;
-    private boolean requiresAck = false;
-    private long LSNByteOffset;
-
-    public AsterixLSMIndexFileProperties() {
-    }
-
-    public AsterixLSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
-            long LSNByteOffset, boolean requiresAck) {
-        initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
-    }
-
-    public AsterixLSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
-        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
-                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
-    }
-
-    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
-            boolean requiresAck) {
-        this.filePath = filePath;
-        this.fileSize = fileSize;
-        this.nodeId = nodeId;
-        String[] tokens = filePath.split(File.separator);
-        int arraySize = tokens.length;
-        this.fileName = tokens[arraySize - 1];
-        this.ioDeviceNum = getDeviceIONumFromName(tokens[arraySize - 2]);
-        this.idxName = tokens[arraySize - 3];
-        this.dataverse = tokens[arraySize - 4];
-        this.lsmComponentFile = lsmComponentFile;
-        this.LSNByteOffset = LSNByteOffset;
-        this.requiresAck = requiresAck;
-    }
-
-    public static int getDeviceIONumFromName(String name) {
-        return Integer.parseInt(name.substring(IndexFileNameUtil.IO_DEVICE_NAME_PREFIX.length()));
-    }
-
-    public void serialize(OutputStream out) throws IOException {
-        DataOutputStream dos = new DataOutputStream(out);
-        dos.writeUTF(nodeId);
-        dos.writeUTF(filePath);
-        dos.writeLong(fileSize);
-        dos.writeBoolean(lsmComponentFile);
-        dos.writeLong(LSNByteOffset);
-        dos.writeBoolean(requiresAck);
-    }
-
-    public static AsterixLSMIndexFileProperties create(DataInput input) throws IOException {
-        String nodeId = input.readUTF();
-        String filePath = input.readUTF();
-        long fileSize = input.readLong();
-        boolean lsmComponentFile = input.readBoolean();
-        long LSNByteOffset = input.readLong();
-        boolean requiresAck = input.readBoolean();
-        AsterixLSMIndexFileProperties fileProp = new AsterixLSMIndexFileProperties();
-        fileProp.initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
-        return fileProp;
-    }
-
-    public String getFilePath() {
-        return filePath;
-    }
-
-    public void setFilePath(String filePath) {
-        this.filePath = filePath;
-    }
-
-    public long getFileSize() {
-        return fileSize;
-    }
-
-    public String getFileName() {
-        return fileName;
-    }
-
-    public void setFileName(String fileName) {
-        this.fileName = fileName;
-    }
-
-    public String getNodeId() {
-        return nodeId;
-    }
-
-    public void setNodeId(String nodeId) {
-        this.nodeId = nodeId;
-    }
-
-    public String getDataverse() {
-        return dataverse;
-    }
-
-    public void setDataverse(String dataverse) {
-        this.dataverse = dataverse;
-    }
-
-    public void setFileSize(long fileSize) {
-        this.fileSize = fileSize;
-    }
-
-    public int getIoDeviceNum() {
-        return ioDeviceNum;
-    }
-
-    public void setIoDeviceNum(int ioDevoceNum) {
-        this.ioDeviceNum = ioDevoceNum;
-    }
-
-    public String getIdxName() {
-        return idxName;
-    }
-
-    public void setIdxName(String idxName) {
-        this.idxName = idxName;
-    }
-
-    public boolean isLSMComponentFile() {
-        return lsmComponentFile;
-    }
-
-    public void setLsmComponentFile(boolean lsmComponentFile) {
-        this.lsmComponentFile = lsmComponentFile;
-    }
-
-    public boolean requiresAck() {
-        return requiresAck;
-    }
-
-    public void setRequiresAck(boolean requiresAck) {
-        this.requiresAck = requiresAck;
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("File Name: " + fileName + "  ");
-        sb.append("File Size: " + fileSize + "  ");
-        sb.append("Node ID: " + nodeId + "  ");
-        sb.append("I/O Device: " + ioDeviceNum + "  ");
-        sb.append("IDX Name: " + idxName + "  ");
-        sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
-        sb.append("Dataverse: " + dataverse);
-        sb.append("LSN Byte Offset: " + LSNByteOffset);
-        return sb.toString();
-    }
-
-    public long getLSNByteOffset() {
-        return LSNByteOffset;
-    }
-
-    public void setLSNByteOffset(long lSNByteOffset) {
-        LSNByteOffset = lSNByteOffset;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
index 794a6e1..841a99f 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMComponentProperties.java
@@ -47,9 +47,10 @@ public class LSMComponentProperties {
 
     public LSMComponentProperties(ILSMIndexReplicationJob job, String nodeId) {
         this.nodeId = nodeId;
-        componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0], nodeId);
+        componentId = LSMComponentProperties.getLSMComponentID((String) job.getJobFiles().toArray()[0]);
         numberOfFiles = new AtomicInteger(job.getJobFiles().size());
-        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(), job.getLSMIndexOperationContext());
+        originalLSN = LSMComponentProperties.getLSMComponentLSN((AbstractLSMIndex) job.getLSMIndex(),
+                job.getLSMIndexOperationContext());
         opType = job.getLSMOpType();
     }
 
@@ -94,7 +95,7 @@ public class LSMComponentProperties {
 
     public String getMaskPath(ReplicaResourcesManager resourceManager) {
         if (maskPath == null) {
-            AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
+            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
             maskPath = getReplicaComponentPath(resourceManager) + File.separator + afp.getFileName()
                     + ReplicaResourcesManager.LSM_COMPONENT_MASK_SUFFIX;
         }
@@ -103,9 +104,8 @@ public class LSMComponentProperties {
 
     public String getReplicaComponentPath(ReplicaResourcesManager resourceManager) {
         if (replicaPath == null) {
-            AsterixLSMIndexFileProperties afp = new AsterixLSMIndexFileProperties(this);
-            replicaPath = resourceManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(),
-                    afp.getIdxName());
+            LSMIndexFileProperties afp = new LSMIndexFileProperties(this);
+            replicaPath = resourceManager.getIndexPath(afp);
         }
         return replicaPath;
     }
@@ -113,27 +113,24 @@ public class LSMComponentProperties {
     /***
      * @param filePath
      *            any file of the LSM component
-     * @param nodeId
      * @return a unique id based on the timestamp of the component
      */
-    public static String getLSMComponentID(String filePath, String nodeId) {
+    public static String getLSMComponentID(String filePath) {
         String[] tokens = filePath.split(File.separator);
 
         int arraySize = tokens.length;
         String fileName = tokens[arraySize - 1];
-        String ioDevoceName = tokens[arraySize - 2];
-        String idxName = tokens[arraySize - 3];
-        String dataverse = tokens[arraySize - 4];
+        String idxName = tokens[arraySize - 2];
+        String dataverse = tokens[arraySize - 3];
+        String partitionName = tokens[arraySize - 4];
 
         StringBuilder componentId = new StringBuilder();
-        componentId.append(nodeId);
+        componentId.append(partitionName);
         componentId.append(File.separator);
         componentId.append(dataverse);
         componentId.append(File.separator);
         componentId.append(idxName);
         componentId.append(File.separator);
-        componentId.append(ioDevoceName);
-        componentId.append(File.separator);
         componentId.append(fileName.substring(0, fileName.lastIndexOf(AbstractLSMIndexFileManager.SPLIT_STRING)));
         return componentId.toString();
     }
@@ -142,18 +139,10 @@ public class LSMComponentProperties {
         return componentId;
     }
 
-    public void setComponentId(String componentId) {
-        this.componentId = componentId;
-    }
-
     public long getOriginalLSN() {
         return originalLSN;
     }
 
-    public void setOriginalLSN(long lSN) {
-        originalLSN = lSN;
-    }
-
     public String getNodeId() {
         return nodeId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
new file mode 100644
index 0000000..890d3a2
--- /dev/null
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/LSMIndexFileProperties.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
+
+public class LSMIndexFileProperties {
+
+    private String fileName;
+    private long fileSize;
+    private String nodeId;
+    private String dataverse;
+    private String idxName;
+    private boolean lsmComponentFile;
+    private String filePath;
+    private boolean requiresAck = false;
+    private long LSNByteOffset;
+    private int partition;
+
+    public LSMIndexFileProperties() {
+    }
+
+    public LSMIndexFileProperties(String filePath, long fileSize, String nodeId, boolean lsmComponentFile,
+            long LSNByteOffset, boolean requiresAck) {
+        initialize(filePath, fileSize, nodeId, lsmComponentFile, LSNByteOffset, requiresAck);
+    }
+
+    public LSMIndexFileProperties(LSMComponentProperties lsmComponentProperties) {
+        initialize(lsmComponentProperties.getComponentId(), -1, lsmComponentProperties.getNodeId(), false,
+                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+    }
+
+    public void initialize(String filePath, long fileSize, String nodeId, boolean lsmComponentFile, long LSNByteOffset,
+            boolean requiresAck) {
+        this.filePath = filePath;
+        this.fileSize = fileSize;
+        this.nodeId = nodeId;
+        this.lsmComponentFile = lsmComponentFile;
+        this.LSNByteOffset = LSNByteOffset;
+        this.requiresAck = requiresAck;
+    }
+
+    public void splitFileName() {
+        String[] tokens = filePath.split(File.separator);
+        int arraySize = tokens.length;
+        this.fileName = tokens[arraySize - 1];
+        this.idxName = tokens[arraySize - 2];
+        this.dataverse = tokens[arraySize - 3];
+        this.partition = getPartitonNumFromName(tokens[arraySize - 4]);
+    }
+
+    private static int getPartitonNumFromName(String name) {
+        return Integer.parseInt(name.substring(StoragePathUtil.PARTITION_DIR_PREFIX.length()));
+    }
+
+    public void serialize(OutputStream out) throws IOException {
+        DataOutputStream dos = new DataOutputStream(out);
+        dos.writeUTF(nodeId);
+        dos.writeUTF(filePath);
+        dos.writeLong(fileSize);
+        dos.writeBoolean(lsmComponentFile);
+        dos.writeLong(LSNByteOffset);
+        dos.writeBoolean(requiresAck);
+    }
+
+    public static LSMIndexFileProperties create(DataInput input) throws IOException {
+        String nodeId = input.readUTF();
+        String filePath = input.readUTF();
+        long fileSize = input.readLong();
+        boolean lsmComponentFile = input.readBoolean();
+        long LSNByteOffset = input.readLong();
+        boolean requiresAck = input.readBoolean();
+        LSMIndexFileProperties fileProp = new LSMIndexFileProperties(filePath, fileSize, nodeId, lsmComponentFile,
+                LSNByteOffset, requiresAck);
+        return fileProp;
+    }
+
+    public String getFilePath() {
+        return filePath;
+    }
+
+    public long getFileSize() {
+        return fileSize;
+    }
+
+    public String getFileName() {
+        return fileName;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public String getDataverse() {
+        return dataverse;
+    }
+
+    public void setDataverse(String dataverse) {
+        this.dataverse = dataverse;
+    }
+
+    public String getIdxName() {
+        return idxName;
+    }
+
+    public boolean isLSMComponentFile() {
+        return lsmComponentFile;
+    }
+
+    public boolean requiresAck() {
+        return requiresAck;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("File Name: " + fileName + "  ");
+        sb.append("File Size: " + fileSize + "  ");
+        sb.append("Node ID: " + nodeId + "  ");
+        sb.append("Partition: " + partition + "  ");
+        sb.append("IDX Name: " + idxName + "  ");
+        sb.append("isLSMComponentFile : " + lsmComponentFile + "  ");
+        sb.append("Dataverse: " + dataverse);
+        sb.append("LSN Byte Offset: " + LSNByteOffset);
+        return sb.toString();
+    }
+
+    public long getLSNByteOffset() {
+        return LSNByteOffset;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 3e3043c..b9f7506 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -38,91 +38,65 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
+import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.LocalResource;
 
 public class ReplicaResourcesManager implements IReplicaResourcesManager {
     private static final Logger LOGGER = Logger.getLogger(ReplicaResourcesManager.class.getName());
-    private final String[] mountPoints;
-    private final int numIODevices;
-    private static final String REPLICA_FOLDER_SUFFIX = "_replica";
-    private final String replicationStorageFolder;
-    public final String localStorageFolder;
-    private final String localNodeID;
     public final static String LSM_COMPONENT_MASK_SUFFIX = "_mask";
     private final static String REPLICA_INDEX_LSN_MAP_NAME = ".LSN_MAP";
     public static final long REPLICA_INDEX_CREATION_LSN = -1;
     private final AtomicLong lastMinRemoteLSN;
+    private final PersistentLocalResourceRepository localRepository;
+    private final Map<String, ClusterPartition[]> nodePartitions;
 
-    public ReplicaResourcesManager(List<IODeviceHandle> devices, String localStorageFolder, String localNodeID,
-            String replicationStorageFolder) throws HyracksDataException {
-        numIODevices = devices.size();
-        this.mountPoints = new String[numIODevices];
-        for (int i = 0; i < numIODevices; i++) {
-            String mountPoint = devices.get(i).getPath().getPath();
-            File mountPointDir = new File(mountPoint);
-            if (!mountPointDir.exists()) {
-                throw new HyracksDataException(mountPointDir.getAbsolutePath() + " doesn't exist.");
-            }
-            if (!mountPoint.endsWith(System.getProperty("file.separator"))) {
-                mountPoints[i] = new String(mountPoint + System.getProperty("file.separator"));
-            } else {
-                mountPoints[i] = new String(mountPoint);
-            }
-        }
-        this.localStorageFolder = localStorageFolder;
-        this.localNodeID = localNodeID;
-        this.replicationStorageFolder = replicationStorageFolder;
+    public ReplicaResourcesManager(ILocalResourceRepository localRepository,
+            AsterixMetadataProperties metadataProperties) {
+        this.localRepository = (PersistentLocalResourceRepository) localRepository;
+        nodePartitions = metadataProperties.getNodePartitions();
         lastMinRemoteLSN = new AtomicLong(-1);
     }
 
-    @Override
-    public String getLocalStorageFolder() {
-        return localStorageFolder;
-    }
-
-    private String getReplicaStorageFolder(String replicaId, int IODeviceNum) {
-        if (replicaId.equals(localNodeID)) {
-            return mountPoints[IODeviceNum] + localStorageFolder;
-        } else {
-            return mountPoints[IODeviceNum] + replicationStorageFolder + File.separator + replicaId
-                    + REPLICA_FOLDER_SUFFIX;
-        }
-    }
-
-    public void deleteRemoteFile(AsterixLSMIndexFileProperties afp) throws IOException {
-        String indexPath = getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(), afp.getDataverse(), afp.getIdxName());
+    public void deleteIndexFile(LSMIndexFileProperties afp) {
+        String indexPath = getIndexPath(afp);
         if (indexPath != null) {
             if (afp.isLSMComponentFile()) {
                 String backupFilePath = indexPath + File.separator + afp.getFileName();
 
                 //delete file
                 File destFile = new File(backupFilePath);
-                if (destFile.exists()) {
-                    destFile.delete();
-                }
+                FileUtils.deleteQuietly(destFile);
             } else {
                 //delete index files
                 indexPath = indexPath.substring(0, indexPath.lastIndexOf(File.separator));
-                AsterixFilesUtil.deleteFolder(indexPath);
+                FileUtils.deleteQuietly(new File(indexPath));
             }
         }
     }
 
-    public String getIndexPath(String replicaId, int IODeviceNum, String dataverse, String dataset) {
-        //mounting point/backupNodeId_replica/Dataverse/Dataset/device_id_#/
-        String remoteIndexFolderPath = getReplicaStorageFolder(replicaId, IODeviceNum) + File.separator + dataverse
-                + File.separator + dataset + File.separator + IndexFileNameUtil.IO_DEVICE_NAME_PREFIX + IODeviceNum;
-        Path path = Paths.get(remoteIndexFolderPath);
+    public String getIndexPath(LSMIndexFileProperties fileProperties) {
+        fileProperties.splitFileName();
+        //get partition path in this node
+        String partitionPath = localRepository.getPartitionPath(fileProperties.getPartition());
+        //get index path
+        String indexPath = SplitsAndConstraintsUtil.getIndexPath(partitionPath, fileProperties.getPartition(),
+                fileProperties.getDataverse(), fileProperties.getIdxName());
+
+        Path path = Paths.get(indexPath);
         if (!Files.exists(path)) {
-            File indexFolder = new File(remoteIndexFolderPath);
+            File indexFolder = new File(indexPath);
             indexFolder.mkdirs();
         }
-        return remoteIndexFolderPath;
+        return indexPath;
     }
 
     public void initializeReplicaIndexLSNMap(String indexPath, long currentLSN) throws IOException {
@@ -144,14 +118,10 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         //remove mask to mark component as valid
         String maskPath = lsmComponentProperties.getMaskPath(this);
         Path path = Paths.get(maskPath);
-
-        if (Files.exists(path)) {
-            File maskFile = new File(maskPath);
-            maskFile.delete();
-        }
+        Files.deleteIfExists(path);
 
         //add component LSN to the index LSNs map
-        HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
+        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(lsmComponentProperties.getReplicaComponentPath(this));
         lsnMap.put(lsmComponentProperties.getOriginalLSN(), lsmComponentProperties.getReplicaLSN());
 
         //update map on disk
@@ -159,73 +129,11 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
 
     }
 
-    public List<String> getResourcesForReplica(String nodeId) throws HyracksDataException {
-        List<String> resourcesList = new ArrayList<String>();
-        String rootFolder;
-        for (int i = 0; i < numIODevices; i++) {
-            rootFolder = getReplicaStorageFolder(nodeId, i);
-            File rootDirFile = new File(rootFolder);
-            if (!rootDirFile.exists()) {
-                continue;
-            }
-
-            File[] dataverseFileList = rootDirFile.listFiles();
-            for (File dataverseFile : dataverseFileList) {
-                if (dataverseFile.isDirectory()) {
-                    File[] indexFileList = dataverseFile.listFiles();
-                    if (indexFileList != null) {
-                        for (File indexFile : indexFileList) {
-                            if (indexFile.isDirectory()) {
-                                File[] ioDevicesList = indexFile.listFiles();
-                                if (ioDevicesList != null) {
-                                    for (File ioDeviceFile : ioDevicesList) {
-                                        if (ioDeviceFile.isDirectory()) {
-                                            File[] metadataFiles = ioDeviceFile.listFiles(LSM_INDEX_FILES_FILTER);
-                                            if (metadataFiles != null) {
-                                                for (File metadataFile : metadataFiles) {
-                                                    resourcesList.add(metadataFile.getAbsolutePath());
-                                                }
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-        return resourcesList;
-    }
-
-    public Set<File> getReplicaIndexes(String replicaId) throws HyracksDataException {
+    public Set<File> getReplicaIndexes(String replicaId) {
         Set<File> remoteIndexesPaths = new HashSet<File>();
-        for (int i = 0; i < numIODevices; i++) {
-            String rootReplicaFolder = getReplicaStorageFolder(replicaId, i);
-            File rootDirFile = new File(rootReplicaFolder);
-            if (!rootDirFile.exists()) {
-                continue;
-            }
-            File[] dataverseFileList = rootDirFile.listFiles();
-            for (File dataverseFile : dataverseFileList) {
-                if (dataverseFile.isDirectory()) {
-                    File[] indexFileList = dataverseFile.listFiles();
-                    if (indexFileList != null) {
-                        for (File indexFile : indexFileList) {
-                            if (indexFile.isDirectory()) {
-                                File[] ioDevicesList = indexFile.listFiles();
-                                if (ioDevicesList != null) {
-                                    for (File ioDeviceFile : ioDevicesList) {
-                                        if (ioDeviceFile.isDirectory()) {
-                                            remoteIndexesPaths.add(ioDeviceFile);
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                    }
-                }
-            }
+        ClusterPartition[] partitions = nodePartitions.get(replicaId);
+        for (ClusterPartition partition : partitions) {
+            remoteIndexesPaths.addAll(getPartitionIndexes(partition.getPartitionId()));
         }
         return remoteIndexesPaths;
     }
@@ -237,41 +145,60 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         }
         long minRemoteLSN = Long.MAX_VALUE;
         for (String replica : replicaIds) {
-            try {
-                //for every index in replica
-                Set<File> remoteIndexes = getReplicaIndexes(replica);
-                for (File indexFolder : remoteIndexes) {
-                    //read LSN map
-                    try {
-                        //get max LSN per index
-                        long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
-
-                        //get min of all maximums
-                        minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
-                    } catch (IOException e) {
-                        LOGGER.log(Level.INFO, indexFolder.getAbsolutePath() + " Couldn't read LSN map for index "
-                                + indexFolder);
-                        continue;
-                    }
+            //for every index in replica
+            Set<File> remoteIndexes = getReplicaIndexes(replica);
+            for (File indexFolder : remoteIndexes) {
+                //read LSN map
+                try {
+                    //get max LSN per index
+                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+
+                    //get min of all maximums
+                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                } catch (IOException e) {
+                    LOGGER.log(Level.INFO,
+                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+                    continue;
                 }
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
             }
         }
         lastMinRemoteLSN.set(minRemoteLSN);
         return minRemoteLSN;
     }
 
-    public HashMap<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN)
-            throws IOException {
-        HashMap<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
+    @Override
+    public long getPartitionsMinLSN(Integer[] partitions) {
+        long minRemoteLSN = Long.MAX_VALUE;
+        for (Integer partition : partitions) {
+            //for every index in replica
+            Set<File> remoteIndexes = getPartitionIndexes(partition);
+            for (File indexFolder : remoteIndexes) {
+                //read LSN map
+                try {
+                    //get max LSN per index
+                    long remoteIndexMaxLSN = getReplicaIndexMaxLSN(indexFolder);
+
+                    //get min of all maximums
+                    minRemoteLSN = Math.min(minRemoteLSN, remoteIndexMaxLSN);
+                } catch (IOException e) {
+                    LOGGER.log(Level.INFO,
+                            indexFolder.getAbsolutePath() + " Couldn't read LSN map for index " + indexFolder);
+                    continue;
+                }
+            }
+        }
+        return minRemoteLSN;
+    }
+
+    public Map<Long, String> getLaggingReplicaIndexesId2PathMap(String replicaId, long targetLSN) throws IOException {
+        Map<Long, String> laggingReplicaIndexes = new HashMap<Long, String>();
         try {
             //for every index in replica
             Set<File> remoteIndexes = getReplicaIndexes(replicaId);
             for (File indexFolder : remoteIndexes) {
                 if (getReplicaIndexMaxLSN(indexFolder) < targetLSN) {
-                    File localResource = new File(indexFolder + File.separator
-                            + PersistentLocalResourceRepository.METADATA_FILE_NAME);
+                    File localResource = new File(
+                            indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME);
                     LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
                     laggingReplicaIndexes.put(resource.getResourceId(), indexFolder.getAbsolutePath());
                 }
@@ -286,7 +213,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
     private long getReplicaIndexMaxLSN(File indexFolder) throws IOException {
         long remoteIndexMaxLSN = 0;
         //get max LSN per index
-        HashMap<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
+        Map<Long, Long> lsnMap = getReplicaIndexLSNMap(indexFolder.getAbsolutePath());
         if (lsnMap != null) {
             for (Long lsn : lsnMap.values()) {
                 remoteIndexMaxLSN = Math.max(remoteIndexMaxLSN, lsn);
@@ -296,7 +223,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         return remoteIndexMaxLSN;
     }
 
-    public void cleanInvalidLSMComponents(String replicaId) throws HyracksDataException {
+    public void cleanInvalidLSMComponents(String replicaId) {
         //for every index in replica
         Set<File> remoteIndexes = getReplicaIndexes(replicaId);
         for (File remoteIndexFile : remoteIndexes) {
@@ -312,7 +239,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         }
     }
 
-    private void deleteLSMComponentFilesForMask(File maskFile) {
+    private static void deleteLSMComponentFilesForMask(File maskFile) {
         String lsmComponentTimeStamp = maskFile.getName().substring(0,
                 maskFile.getName().length() - LSM_COMPONENT_MASK_SUFFIX.length());
         File indexFolder = maskFile.getParentFile();
@@ -325,78 +252,92 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public synchronized HashMap<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
-        FileInputStream fis = null;
-        ObjectInputStream oisFromFis = null;
-        try {
-            fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-            oisFromFis = new ObjectInputStream(fis);
+    @SuppressWarnings({ "unchecked" })
+    public synchronized Map<Long, Long> getReplicaIndexLSNMap(String indexPath) throws IOException {
+        try (FileInputStream fis = new FileInputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
             Map<Long, Long> lsnMap = null;
             try {
                 lsnMap = (Map<Long, Long>) oisFromFis.readObject();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
-            return (HashMap<Long, Long>) lsnMap;
-        } finally {
-            if (oisFromFis != null) {
-                oisFromFis.close();
-            }
-            if (oisFromFis == null && fis != null) {
-                fis.close();
-            }
+            return lsnMap;
         }
     }
 
-    public synchronized void updateReplicaIndexLSNMap(String indexPath, HashMap<Long, Long> lsnMap) throws IOException {
-        FileOutputStream fos = null;
-        ObjectOutputStream oosToFos = null;
-        try {
-            fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
-            oosToFos = new ObjectOutputStream(fos);
+    public synchronized void updateReplicaIndexLSNMap(String indexPath, Map<Long, Long> lsnMap) throws IOException {
+        try (FileOutputStream fos = new FileOutputStream(indexPath + File.separator + REPLICA_INDEX_LSN_MAP_NAME);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(lsnMap);
             oosToFos.flush();
             lastMinRemoteLSN.set(-1);
-        } finally {
-            if (oosToFos != null) {
-                oosToFos.close();
+        }
+    }
+
+    /**
+     * @param partition
+     * @return Set of file references to each index in the partition
+     */
+    public Set<File> getPartitionIndexes(int partition) {
+        Set<File> partitionIndexes = new HashSet<File>();
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        String partitionStoragePath = localRepository.getPartitionPath(partition)
+                + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition);
+        File partitionRoot = new File(partitionStoragePath);
+        if (partitionRoot.exists() && partitionRoot.isDirectory()) {
+            File[] dataverseFileList = partitionRoot.listFiles();
+            if (dataverseFileList != null) {
+                for (File dataverseFile : dataverseFileList) {
+                    if (dataverseFile.isDirectory()) {
+                        File[] indexFileList = dataverseFile.listFiles();
+                        if (indexFileList != null) {
+                            for (File indexFile : indexFileList) {
+                                partitionIndexes.add(indexFile);
+                            }
+                        }
+                    }
+                }
             }
-            if (oosToFos == null && fos != null) {
-                fos.close();
+        }
+        return partitionIndexes;
+    }
+
+    /**
+     * @param partition
+     * @return Absolute paths to all partition files
+     */
+    public List<String> getPartitionIndexesFiles(int partition) {
+        List<String> partitionFiles = new ArrayList<String>();
+        Set<File> partitionIndexes = getPartitionIndexes(partition);
+        for (File indexDir : partitionIndexes) {
+            if (indexDir.isDirectory()) {
+                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+                if (indexFiles != null) {
+                    for (File file : indexFiles) {
+                        partitionFiles.add(file.getAbsolutePath());
+                    }
+                }
             }
         }
+        return partitionFiles;
     }
 
     private static final FilenameFilter LSM_COMPONENTS_MASKS_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
-                return true;
-            } else {
-                return false;
-            }
+            return name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_COMPONENTS_NON_MASKS_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (!name.endsWith(LSM_COMPONENT_MASK_SUFFIX)) {
-                return true;
-            } else {
-                return false;
-            }
+            return !name.endsWith(LSM_COMPONENT_MASK_SUFFIX);
         }
     };
 
     private static final FilenameFilter LSM_INDEX_FILES_FILTER = new FilenameFilter() {
         public boolean accept(File dir, String name) {
-            if (name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME)) {
-                return true;
-            } else if (!name.startsWith(".")) {
-                return true;
-            } else {
-                return false;
-            }
+            return name.equalsIgnoreCase(PersistentLocalResourceRepository.METADATA_FILE_NAME) || !name.startsWith(".");
         }
     };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/data/fbu.adm
----------------------------------------------------------------------
diff --git a/asterix-replication/src/test/resources/data/fbu.adm b/asterix-replication/src/test/resources/data/fbu.adm
new file mode 100644
index 0000000..7e99ea4
--- /dev/null
+++ b/asterix-replication/src/test/resources/data/fbu.adm
@@ -0,0 +1,10 @@
+{"id":1,"alias":"Margarita","name":"MargaritaStoddard","user-since":datetime("2012-08-20T10:10:00"),"friend-ids":{{2,3,6,10}},"employment":[{"organization-name":"Codetechno","start-date":date("2006-08-06")}]}
+{"id":2,"alias":"Isbel","name":"IsbelDull","user-since":datetime("2011-01-22T10:10:00"),"friend-ids":{{1,4}},"employment":[{"organization-name":"Hexviafind","start-date":date("2010-04-27")}]}
+{"id":3,"alias":"Emory","name":"EmoryUnk","user-since":datetime("2012-07-10T10:10:00"),"friend-ids":{{1,5,8,9}},"employment":[{"organization-name":"geomedia","start-date":date("2010-06-17"),"end-date":date("2010-01-26")}]}
+{"id":4,"alias":"Nicholas","name":"NicholasStroh","user-since":datetime("2010-12-27T10:10:00"),"friend-ids":{{2}},"employment":[{"organization-name":"Zamcorporation","start-date":date("2010-06-08")}]}
+{"id":5,"alias":"Von","name":"VonKemble","user-since":datetime("2010-01-05T10:10:00"),"friend-ids":{{3,6,10}},"employment":[{"organization-name":"Kongreen","start-date":date("2010-11-27")}]}
+{"id":6,"alias":"Willis","name":"WillisWynne","user-since":datetime("2005-01-17T10:10:00"),"friend-ids":{{1,3,7}},"employment":[{"organization-name":"jaydax","start-date":date("2009-05-15")}]}
+{"id":7,"alias":"Suzanna","name":"SuzannaTillson","user-since":datetime("2012-08-07T10:10:00"),"friend-ids":{{6}},"employment":[{"organization-name":"Labzatron","start-date":date("2011-04-19")}]}
+{"id":8,"alias":"Nila","name":"NilaMilliron","user-since":datetime("2008-01-01T10:10:00"),"friend-ids":{{3}},"employment":[{"organization-name":"Plexlane","start-date":date("2010-02-28")}]}
+{"id":9,"alias":"Woodrow","name":"WoodrowNehling","user-since":datetime("2005-09-20T10:10:00"),"friend-ids":{{3,10}},"employment":[{"organization-name":"Zuncan","start-date":date("2003-04-22"),"end-date":date("2009-12-13")}]}
+{"id":10,"alias":"Bram","name":"BramHatch","user-since":datetime("2010-10-16T10:10:00"),"friend-ids":{{1,5,9}},"employment":[{"organization-name":"physcane","start-date":date("2007-06-05"),"end-date":date("2011-11-05")}]}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/scripts/delete_storage.sh
----------------------------------------------------------------------
diff --git a/asterix-replication/src/test/resources/scripts/delete_storage.sh b/asterix-replication/src/test/resources/scripts/delete_storage.sh
new file mode 100755
index 0000000..129030b
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/delete_storage.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+rm -rf ~/asterix/*

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
----------------------------------------------------------------------
diff --git a/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
new file mode 100755
index 0000000..2582713
--- /dev/null
+++ b/asterix-replication/src/test/resources/scripts/kill_cc_and_nc.sh
@@ -0,0 +1,18 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+jps | awk '{if ($2 == "NCDriver" || $2 == "CCDriver") print $1;}' | xargs -n 1 kill -9

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 52fd806..9f59148 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -32,9 +32,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.replication.AsterixReplicationJob;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.commons.io.FileUtils;
@@ -63,10 +66,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     private IReplicationManager replicationManager;
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
+    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
 
-    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId) throws HyracksDataException {
+    public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
+            AsterixMetadataProperties metadataProperties) throws HyracksDataException {
         mountPoints = new String[devices.size()];
         this.nodeId = nodeId;
+        this.clusterPartitions = metadataProperties.getClusterPartitions();
         for (int i = 0; i < mountPoints.length; i++) {
             String mountPoint = devices.get(i).getPath().getPath();
             File mountPointDir = new File(mountPoint);
@@ -156,37 +162,18 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
             resourceCache.put(resource.getResourcePath(), resource);
         }
 
-        FileOutputStream fos = null;
-        ObjectOutputStream oosToFos = null;
-
-        try {
-            fos = new FileOutputStream(resourceFile);
-            oosToFos = new ObjectOutputStream(fos);
+        try (FileOutputStream fos = new FileOutputStream(resourceFile);
+                ObjectOutputStream oosToFos = new ObjectOutputStream(fos)) {
             oosToFos.writeObject(resource);
             oosToFos.flush();
         } catch (IOException e) {
             throw new HyracksDataException(e);
-        } finally {
-            if (oosToFos != null) {
-                try {
-                    oosToFos.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-            if (oosToFos == null && fos != null) {
-                try {
-                    fos.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
+        }
 
-            //if replication enabled, send resource metadata info to remote nodes
-            if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
-                String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
-                createReplicationJob(ReplicationOperation.REPLICATE, filePath);
-            }
+        //if replication enabled, send resource metadata info to remote nodes
+        if (isReplicationEnabled && resource.getResourceId() != STORAGE_LOCAL_RESOURCE_ID) {
+            String filePath = getFileName(resource.getResourcePath(), resource.getResourceId());
+            createReplicationJob(ReplicationOperation.REPLICATE, filePath);
         }
     }
 
@@ -304,31 +291,12 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
     }
 
     public static LocalResource readLocalResource(File file) throws HyracksDataException {
-        FileInputStream fis = null;
-        ObjectInputStream oisFromFis = null;
-
-        try {
-            fis = new FileInputStream(file);
-            oisFromFis = new ObjectInputStream(fis);
+        try (FileInputStream fis = new FileInputStream(file);
+                ObjectInputStream oisFromFis = new ObjectInputStream(fis)) {
             LocalResource resource = (LocalResource) oisFromFis.readObject();
             return resource;
         } catch (Exception e) {
             throw new HyracksDataException(e);
-        } finally {
-            if (oisFromFis != null) {
-                try {
-                    oisFromFis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-            if (oisFromFis == null && fis != null) {
-                try {
-                    fis.close();
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
         }
     }
 
@@ -427,4 +395,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
         }
         return storageRootDir;
     }
+
+    /**
+     * @param partition
+     * @return The partition local path on this NC.
+     */
+    public String getPartitionPath(int partition) {
+        //currently each partition is replicated on the same IO device number on all NCs.
+        return mountPoints[clusterPartitions.get(partition).getIODeviceNum()];
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index b6bb7dc..e79a0d3 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
@@ -26,14 +27,17 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 public class PersistentLocalResourceRepositoryFactory implements ILocalResourceRepositoryFactory {
     private final IIOManager ioManager;
     private final String nodeId;
+    private final AsterixMetadataProperties metadataProperties;
 
-    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId) {
+    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, String nodeId,
+            AsterixMetadataProperties metadataProperties) {
         this.ioManager = ioManager;
         this.nodeId = nodeId;
+        this.metadataProperties = metadataProperties;
     }
 
     @Override
     public ILocalResourceRepository createRepository() throws HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId);
+        return new PersistentLocalResourceRepository(ioManager.getIODevices(), nodeId, metadataProperties);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index 701e529..8fe75f2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -44,13 +44,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -72,14 +70,11 @@ import org.apache.asterix.transaction.management.service.transaction.Transaction
 import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.common.util.IndexFileNameUtil;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -300,7 +295,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
             //get datasetLifeCycleManager
             IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
                     .getDatasetLifecycleManager();
-            IIOManager ioManager = appRuntimeContext.getIOManager();
             ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
             Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
                     .loadAndGetAllResources();
@@ -507,13 +501,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         //#. get indexLifeCycleManager 
         IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
-        IIOManager ioManager = appRuntimeContext.getIOManager();
-        SortedMap<Integer, ClusterPartition> clusterPartitions = ((IAsterixPropertiesProvider) appRuntimeContext
-                .getAppContext()).getMetadataProperties().getClusterPartitions();
         IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager();
-        ILocalResourceRepository localResourceRepository = appRuntimeContext.getLocalResourceRepository();
-        Map<Long, LocalResource> resourcesMap = ((PersistentLocalResourceRepository) localResourceRepository)
-                .loadAndGetAllResources();
+        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+                .getLocalResourceRepository();
+        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
         //#. set log reader to the lowWaterMarkLsn again.
         for (int i = 0; i < remoteLogs.size(); i++) {
             logRecord = remoteLogs.get(i);
@@ -561,16 +552,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                             //get index instance from IndexLifeCycleManager
                             //if index is not registered into IndexLifeCycleManager,
                             //create the index using LocalMetadata stored in LocalResourceRepository
-                            //get the resource path relative to this node
-                            int resourcePartition = localResource.getPartition();
-                            //get partition io device id
-                            //NOTE:
-                            //currently we store all partition in the same IO device in all nodes. If this changes,
-                            //this needs to be updated to find the IO device in which the partition is stored in this local node.
-                            int ioDevice = clusterPartitions.get(resourcePartition).getIODeviceNum();
-                            String resourceAbsolutePath = ioManager
-                                    .getAbsoluteFileRef(ioDevice, localResource.getResourceName()).getFile()
-                                    .getAbsolutePath();
+                            //get partition path in this node
+                            String partitionIODevicePath = localResourceRepository
+                                    .getPartitionPath(localResource.getPartition());
+                            String resourceAbsolutePath = partitionIODevicePath + File.separator
+                                    + localResource.getResourceName();
                             localResource.setResourcePath(resourceAbsolutePath);
                             index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
                             if (index == null) {
@@ -578,8 +564,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
                                 localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
                                 index = localResourceMetadata.createIndexInstance(appRuntimeContext,
                                         resourceAbsolutePath, localResource.getPartition());
-                                datasetLifecycleManager.register(localResource.getResourceName(), index);
-                                datasetLifecycleManager.open(localResource.getResourceName());
+                                datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                datasetLifecycleManager.open(resourceAbsolutePath);
 
                                 //#. get maxDiskLastLSN
                                 ILSMIndex lsmIndex = index;
@@ -1099,6 +1085,242 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
         }
     }
 
+    //TODO (mhubail) RecoveryManager has three methods that perform logs REDO based on different parameters.
+    //They need to be refactored to use partitions only once the log format includes partition id.
+    @Override
+    public synchronized void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode)
+            throws IOException, ACIDException {
+        //delete any recovery files from previous failed recovery attempts
+        deleteRecoveryTemporaryFiles();
+
+        int updateLogCount = 0;
+        int entityCommitLogCount = 0;
+        int jobCommitLogCount = 0;
+        int redoCount = 0;
+        int abortLogCount = 0;
+        int jobId = -1;
+
+        state = SystemState.RECOVERING;
+        LOGGER.log(Level.INFO, "[RecoveryMgr] starting recovery ...");
+
+        Set<Integer> winnerJobSet = new HashSet<Integer>();
+        jobId2WinnerEntitiesMap = new HashMap<>();
+
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
+        JobEntityCommits jobEntityWinners = null;
+        //#. read checkpoint file and set lowWaterMark where anaylsis and redo start
+        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
+        if (lowWaterMarkLSN < readableSmallestLSN) {
+            lowWaterMarkLSN = readableSmallestLSN;
+        }
+        //-------------------------------------------------------------------------
+        //  [ analysis phase ]
+        //  - collect all committed Lsn
+        //-------------------------------------------------------------------------
+        LOGGER.log(Level.INFO, "[RecoveryMgr] in analysis phase");
+        IAsterixAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider();
+        //get datasetLifeCycleManager
+        IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
+        PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appRuntimeContext
+                .getLocalResourceRepository();
+        Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources();
+        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<Long, Long>();
+
+        //#. set log reader to the lowWaterMarkLsn
+        ILogReader logReader = logMgr.getLogReader(true);
+        ILogRecord logRecord = null;
+        try {
+            logReader.initializeScan(lowWaterMarkLSN);
+            logRecord = logReader.next();
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                //TODO update this partitions once the log format is updated to include partitons
+                if (logRecord.getNodeId().equals(failedNode)) {
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                            updateLogCount++;
+                            break;
+                        case LogType.JOB_COMMIT:
+                            jobId = logRecord.getJobId();
+                            winnerJobSet.add(jobId);
+                            if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                                //to delete any spilled files as well
+                                jobEntityWinners.clear();
+                                jobId2WinnerEntitiesMap.remove(jobId);
+                            }
+                            jobCommitLogCount++;
+                            break;
+                        case LogType.ENTITY_COMMIT:
+                            jobId = logRecord.getJobId();
+                            if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = new JobEntityCommits(jobId);
+                                if (needToFreeMemory()) {
+                                    //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk.
+                                    //This could happen only when we have many jobs with small number of records and none of them have job commit.
+                                    freeJobsCachedEntities(jobId);
+                                }
+                                jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners);
+                            } else {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                            }
+                            jobEntityWinners.add(logRecord);
+                            entityCommitLogCount++;
+                            break;
+                        case LogType.ABORT:
+                            abortLogCount++;
+                            break;
+                        case LogType.FLUSH:
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
+                }
+                logRecord = logReader.next();
+            }
+
+            //prepare winners for search after analysis is done to flush anything remaining in memory to disk.
+            for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
+                winners.prepareForSearch();
+            }
+            //-------------------------------------------------------------------------
+            //  [ redo phase ]
+            //  - redo if
+            //    1) The TxnId is committed && --> guarantee durability
+            //    2) lsn > maxDiskLastLsn of the index --> guarantee idempotence
+            //-------------------------------------------------------------------------
+            LOGGER.info("[RecoveryMgr] in redo phase");
+
+            long resourceId;
+            long maxDiskLastLsn;
+            long LSN = -1;
+            ILSMIndex index = null;
+            LocalResource localResource = null;
+            ILocalResourceMetadata localResourceMetadata = null;
+            boolean foundWinner = false;
+            //set log reader to the lowWaterMarkLsn again.
+            logReader.initializeScan(lowWaterMarkLSN);
+            logRecord = logReader.next();
+            while (logRecord != null) {
+                if (IS_DEBUG_MODE) {
+                    LOGGER.info(logRecord.getLogRecordForDisplay());
+                }
+                //TODO update this to check for partitions instead of node id once the log format is updated to include partitions
+                if (logRecord.getNodeId().equals(failedNode)) {
+                    LSN = logRecord.getLSN();
+                    jobId = logRecord.getJobId();
+                    foundWinner = false;
+                    switch (logRecord.getLogType()) {
+                        case LogType.UPDATE:
+                            if (winnerJobSet.contains(jobId)) {
+                                foundWinner = true;
+                            } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
+                                jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
+                                tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
+                                        logRecord.getPKValue(), logRecord.getPKValueSize());
+                                if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
+                                    foundWinner = true;
+                                }
+                            }
+                            if (foundWinner) {
+                                resourceId = logRecord.getResourceId();
+                                localResource = resourcesMap.get(resourceId);
+                                /*******************************************************************
+                                 * [Notice]
+                                 * -> Issue
+                                 * Delete index may cause a problem during redo.
+                                 * The index operation to be redone couldn't be redone because the corresponding index
+                                 * may not exist in NC due to the possible index drop DDL operation.
+                                 * -> Approach
+                                 * Avoid the problem during redo.
+                                 * More specifically, the problem will be detected when the localResource of
+                                 * the corresponding index is retrieved, which will end up with 'null'.
+                                 * If null is returned, then just go and process the next
+                                 * log record.
+                                 *******************************************************************/
+                                if (localResource == null) {
+                                    logRecord = logReader.next();
+                                    continue;
+                                }
+                                /*******************************************************************/
+
+                                //get index instance from IndexLifeCycleManager
+                                //if index is not registered into IndexLifeCycleManager,
+                                //create the index using LocalMetadata stored in LocalResourceRepository
+                                //get partition path in this node
+                                String partitionIODevicePath = localResourceRepository
+                                        .getPartitionPath(localResource.getPartition());
+                                String resourceAbsolutePath = partitionIODevicePath + File.separator
+                                        + localResource.getResourceName();
+                                localResource.setResourcePath(resourceAbsolutePath);
+                                index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
+                                if (index == null) {
+                                    //#. create index instance and register to indexLifeCycleManager
+                                    localResourceMetadata = (ILocalResourceMetadata) localResource.getResourceObject();
+                                    index = localResourceMetadata.createIndexInstance(appRuntimeContext,
+                                            resourceAbsolutePath, localResource.getPartition());
+                                    datasetLifecycleManager.register(resourceAbsolutePath, index);
+                                    datasetLifecycleManager.open(resourceAbsolutePath);
+
+                                    //#. get maxDiskLastLSN
+                                    ILSMIndex lsmIndex = index;
+                                    try {
+                                        maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
+                                                .getIOOperationCallback())
+                                                        .getComponentLSN(lsmIndex.getImmutableComponents());
+                                    } catch (HyracksDataException e) {
+                                        datasetLifecycleManager.close(resourceAbsolutePath);
+                                        throw e;
+                                    }
+
+                                    //#. set resourceId and maxDiskLastLSN to the map
+                                    resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
+                                } else {
+                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
+                                }
+
+                                if (LSN > maxDiskLastLsn) {
+                                    redo(logRecord, datasetLifecycleManager);
+                                    redoCount++;
+                                }
+                            }
+                            break;
+                        case LogType.JOB_COMMIT:
+                        case LogType.ENTITY_COMMIT:
+                        case LogType.ABORT:
+                        case LogType.FLUSH:
+                            //do nothing
+                            break;
+                        default:
+                            throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
+                    }
+                }
+                logRecord = logReader.next();
+            }
+
+            if (LOGGER.isLoggable(Level.INFO)) {
+                LOGGER.info("[RecoveryMgr] recovery is completed.");
+                LOGGER.info("[RecoveryMgr's recovery log count] update/entityCommit/jobCommit/abort/redo = "
+                        + updateLogCount + "/" + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount
+                        + "/" + redoCount);
+            }
+        } finally {
+            logReader.close();
+
+            //close all indexes
+            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
+            for (long r : resourceIdList) {
+                datasetLifecycleManager.close(resourcesMap.get(r).getResourcePath());
+            }
+
+            //delete any recovery files after completing recovery
+            deleteRecoveryTemporaryFiles();
+        }
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final int jobId;
@@ -1145,7 +1367,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
 
         /**
          * Call this method when no more entity commits will be added to this job.
-         * 
+         *
          * @throws IOException
          */
         public void prepareForSearch() throws IOException {
@@ -1182,7 +1404,6 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
          */
         public ArrayList<File> getCandidiatePartitions(long logLSN) {
             ArrayList<File> candidiatePartitions = new ArrayList<File>();
-
             for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
                 String partitionName = partition.getName();
                 //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN 


[3/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance

Posted by mh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
index 447e96d..f54db63 100644
--- a/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
+++ b/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java
@@ -24,12 +24,14 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -54,6 +56,7 @@ import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.StringRequestEntity;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.json.JSONObject;
 
 public class TestExecutor {
@@ -361,6 +364,14 @@ public class TestExecutor {
         return getProcessOutput(p);
     }
 
+    private static String executeVagrantScript(ProcessBuilder pb, String node, String scriptName) throws Exception {
+        pb.command("vagrant", "ssh", node, "--", pb.environment().get("SCRIPT_HOME") + scriptName);
+        Process p = pb.start();
+        p.waitFor();
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+
     private static String getScriptPath(String queryPath, String scriptBasePath, String scriptFileName) {
         String targetWord = "queries" + File.separator;
         int targetWordSize = targetWord.lastIndexOf(File.separator);
@@ -565,6 +576,22 @@ public class TestExecutor {
                             }
                             System.err.println("...but that was expected.");
                             break;
+                        case "vagrant_script":
+                            try {
+                                String[] command = statement.trim().split(" ");
+                                if (command.length != 2) {
+                                    throw new Exception("invalid vagrant script format");
+                                }
+                                String nodeId = command[0];
+                                String scriptName = command[1];
+                                String output = executeVagrantScript(pb, nodeId, scriptName);
+                                if (output.contains("ERROR")) {
+                                    throw new Exception(output);
+                                }
+                            } catch (Exception e) {
+                                throw new Exception("Test \"" + testFile + "\" FAILED!\n", e);
+                            }
+                            break;
                         default:
                             throw new IllegalArgumentException("No statements of type " + ctx.getType());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
----------------------------------------------------------------------
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index 6085019..ce84cc8 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -315,10 +315,6 @@ public class PatternCreator {
         patternList.addAll(createRemoveAsterixLogDirPattern(instance).getPattern());
         patternList.addAll(createRemoveAsterixRootMetadata(instance).getPattern());
         patternList.addAll(createRemoveAsterixTxnLogs(instance).getPattern());
-        if (instance.getCluster().getDataReplication() != null
-                && instance.getCluster().getDataReplication().isEnabled()) {
-            patternList.addAll(createRemoveAsterixReplicationPattern(instance).getPattern());
-        }
         Patterns patterns = new Patterns(patternList);
         return patterns;
     }
@@ -647,29 +643,4 @@ public class PatternCreator {
         Patterns patterns = new Patterns(patternList);
         return patterns;
     }
-
-    private Patterns createRemoveAsterixReplicationPattern(AsterixInstance instance) throws Exception {
-
-        List<Pattern> patternList = new ArrayList<Pattern>();
-        Cluster cluster = instance.getCluster();
-
-        Nodeid nodeid = null;
-        String pargs = null;
-        Event event = null;
-        for (Node node : cluster.getNode()) {
-            String[] nodeIODevices;
-            String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
-            nodeIODevices = iodevices.trim().split(",");
-            for (String nodeIODevice : nodeIODevices) {
-                pargs = nodeIODevice + File.separator + cluster.getDataReplication().getReplicationStore();
-                nodeid = new Nodeid(new Value(null, node.getId()));
-                event = new Event("file_delete", nodeid, pargs);
-                patternList.add(new Pattern(null, 1, null, event));
-            }
-        }
-
-        Patterns patterns = new Patterns(patternList);
-        return patterns;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/pom.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/pom.xml b/asterix-installer/pom.xml
index 34c3e0a..b257e71 100644
--- a/asterix-installer/pom.xml
+++ b/asterix-installer/pom.xml
@@ -29,6 +29,7 @@
 		<failsafe.test.excludes>**/DmlRecoveryIT.java</failsafe.test.excludes>
         <cluster.test.excludes>**/AsterixClusterLifeCycleIT.java</cluster.test.excludes>
 		<cluster.extest.excludes>**/ClusterExecutionIT.java</cluster.extest.excludes>
+        <replication.test.excludes>**/ReplicationIT.java</replication.test.excludes>
 	</properties>
 
   <licenses>
@@ -123,6 +124,7 @@
 						<exclude>${failsafe.test.excludes}</exclude>
                         <exclude>${cluster.test.excludes}</exclude>
 					    <exclude>${cluster.extest.excludes}</exclude>
+                        <exclude>${replication.test.excludes}</exclude>
 					</excludes>
 				</configuration>
 				<executions>
@@ -194,6 +196,8 @@
                             <forkMode>pertest</forkMode>
                             <excludes>
                                 <exclude>${failsafe.test.excludes}</exclude>
+                                <exclude>${cluster.test.excludes}</exclude>
+                                <exclude>${cluster.extest.excludes}</exclude>
                             </excludes>
                         </configuration>
                         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 09c65c8..9559394 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -288,12 +288,6 @@ public class ValidateCommand extends AbstractCommand {
                 valid = false;
             }
 
-            if (cluster.getDataReplication().getReplicationStore() == null
-                    || cluster.getDataReplication().getReplicationStore().length() == 0) {
-                valid = false;
-                LOGGER.fatal("Replication store not defined. " + ERROR);
-            }
-
             if (cluster.getDataReplication().getReplicationPort() == null
                     || cluster.getDataReplication().getReplicationPort().toString().length() == 0) {
                 valid = false;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
new file mode 100644
index 0000000..86c15ae
--- /dev/null
+++ b/asterix-installer/src/test/java/org/apache/asterix/installer/test/ReplicationIT.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.installer.test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.logging.Logger;
+
+import org.apache.asterix.test.aql.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.codehaus.plexus.util.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ReplicationIT {
+
+    private static final String PATH_BASE = StringUtils
+            .join(new String[] { "src", "test", "resources", "integrationts", "replication" }, File.separator);
+    private static final String CLUSTER_BASE = StringUtils
+            .join(new String[] { "src", "test", "resources", "clusterts" }, File.separator);
+    private static final String PATH_ACTUAL = "repliationtest" + File.separator;
+    private static String managixFolderName;
+    private static final Logger LOGGER = Logger.getLogger(ReplicationIT.class.getName());
+    private static File asterixProjectDir = new File(System.getProperty("user.dir"));
+    private static final String CLUSTER_CC_ADDRESS = "10.10.0.2";
+    private static final int CLUSTER_CC_API_PORT = 19002;
+    private static ProcessBuilder pb;
+    private static Map<String, String> env;
+    private final static TestExecutor testExecutor = new TestExecutor(CLUSTER_CC_ADDRESS, CLUSTER_CC_API_PORT);
+    private static String SCRIPT_HOME;
+    protected TestCaseContext tcCtx;
+
+    public ReplicationIT(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        File outdir = new File(PATH_ACTUAL);
+        outdir.mkdirs();
+
+        // vagrant setup
+        File installerTargetDir = new File(asterixProjectDir, "target");
+        String[] installerFiles = installerTargetDir.list(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                return new File(dir, name).isDirectory() && name.startsWith("asterix-installer")
+                        && name.endsWith("binary-assembly");
+            }
+        });
+
+        if (installerFiles == null || installerFiles.length == 0) {
+            throw new Exception("Couldn't find installer binaries");
+        }
+
+        managixFolderName = installerFiles[0];
+
+        //copy tests data
+        FileUtils.copyDirectoryStructure(
+                new File(StringUtils.join(
+                        new String[] { "..", "asterix-replication", "src", "test", "resources", "data" },
+                        File.separator)),
+                new File(StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "data" },
+                        File.separator)));
+
+        //copy tests scripts
+        FileUtils.copyDirectoryStructure(
+                new File(StringUtils.join(
+                        new String[] { "..", "asterix-replication", "src", "test", "resources", "scripts" },
+                        File.separator)),
+                new File(StringUtils.join(new String[] { "src", "test", "resources", "clusterts", "scripts" },
+                        File.separator)));
+
+        invoke("cp", "-r", installerTargetDir.toString() + "/" + managixFolderName,
+                asterixProjectDir + "/" + CLUSTER_BASE);
+
+        remoteInvoke("cp -r /vagrant/" + managixFolderName + " /tmp/asterix");
+
+        SCRIPT_HOME = "/vagrant/scripts/";
+        pb = new ProcessBuilder();
+        env = pb.environment();
+        env.put("SCRIPT_HOME", SCRIPT_HOME);
+        File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+        pb.directory(cwd);
+        pb.redirectErrorStream(true);
+
+        //make scripts executable
+        String chmodScriptsCmd = "chmod -R +x " + SCRIPT_HOME;
+        remoteInvoke(chmodScriptsCmd, "cc");
+        remoteInvoke(chmodScriptsCmd, "nc1");
+        remoteInvoke(chmodScriptsCmd, "nc2");
+
+        //managix configure
+        logOutput(managixInvoke("configure").getInputStream());
+
+        //managix validate
+        String validateOutput = IOUtils.toString(managixInvoke("validate").getInputStream(),
+                StandardCharsets.UTF_8.name());
+        if (validateOutput.contains("ERROR")) {
+            throw new Exception("Managix validate error: " + validateOutput);
+        }
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        //remove files
+        remoteInvoke("rm -rf /vagrant/asterix");
+    }
+
+    @Before
+    public void beforeTest() throws Exception {
+        //create instance
+        managixInvoke("create -n vagrant-ssh -c /vagrant/cluster_with_replication.xml").getInputStream();
+    }
+
+    @After
+    public void afterTest() throws Exception {
+        //stop instance
+        managixInvoke("stop -n vagrant-ssh");
+
+        //verify that all processes have been stopped
+        String killProcesses = "kill_cc_and_nc.sh";
+        executeVagrantScript("cc", killProcesses);
+        executeVagrantScript("nc1", killProcesses);
+        executeVagrantScript("nc2", killProcesses);
+
+        //delete storage
+        String deleteStorage = "delete_storage.sh";
+        executeVagrantScript("cc", deleteStorage);
+        executeVagrantScript("nc1", deleteStorage);
+        executeVagrantScript("nc2", deleteStorage);
+
+        //delete instance
+        managixInvoke("delete -n vagrant-ssh");
+    }
+
+    @Test
+    public void test() throws Exception {
+        testExecutor.executeTest(PATH_ACTUAL, tcCtx, pb, false);
+    }
+
+    @Parameters(name = "ReplicationIT {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        Collection<Object[]> testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+        if (testArgs.size() == 0) {
+            testArgs = buildTestsInXml(TestCaseContext.DEFAULT_TESTSUITE_XML_NAME);
+        }
+        return testArgs;
+    }
+
+    protected static Collection<Object[]> buildTestsInXml(String xmlfile) throws Exception {
+        Collection<Object[]> testArgs = new ArrayList<Object[]>();
+        TestCaseContext.Builder b = new TestCaseContext.Builder();
+        for (TestCaseContext ctx : b.build(new File(PATH_BASE), xmlfile)) {
+            testArgs.add(new Object[] { ctx });
+        }
+        return testArgs;
+    }
+
+    public static boolean checkOutput(InputStream input, String requiredSubString) {
+        String candidate;
+        try {
+            candidate = IOUtils.toString(input, StandardCharsets.UTF_8.name());
+        } catch (IOException e) {
+            LOGGER.warning("Could not check output of subprocess");
+            return false;
+        }
+        return candidate.contains(requiredSubString);
+    }
+
+    public static boolean checkOutput(String candidate, String requiredSubString) {
+        return candidate.contains(requiredSubString);
+    }
+
+    public static String processOut(Process p) throws IOException {
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+
+    public static void logOutput(InputStream input) {
+        try {
+            LOGGER.info(IOUtils.toString(input, StandardCharsets.UTF_8.name()));
+        } catch (IOException e) {
+            LOGGER.warning("Could not print output of subprocess");
+        }
+    }
+
+    private static Process invoke(String... args) throws Exception {
+        ProcessBuilder pb = new ProcessBuilder(args);
+        pb.redirectErrorStream(true);
+        Process p = pb.start();
+        return p;
+    }
+
+    private static Process remoteInvoke(String cmd) throws Exception {
+        ProcessBuilder pb = new ProcessBuilder("vagrant", "ssh", "cc", "-c", "MANAGIX_HOME=/tmp/asterix/ " + cmd);
+        File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+        pb.directory(cwd);
+        pb.redirectErrorStream(true);
+        Process p = pb.start();
+        p.waitFor();
+        return p;
+    }
+
+    private static Process remoteInvoke(String cmd, String node) throws Exception {
+        ProcessBuilder pb = new ProcessBuilder("vagrant", "ssh", node, "--", cmd);
+        File cwd = new File(asterixProjectDir.toString() + "/" + CLUSTER_BASE);
+        pb.directory(cwd);
+        pb.redirectErrorStream(true);
+        Process p = pb.start();
+        p.waitFor();
+        return p;
+    }
+
+    private static Process managixInvoke(String cmd) throws Exception {
+        return remoteInvoke("/tmp/asterix/bin/managix " + cmd);
+    }
+
+    private static String executeVagrantScript(String node, String scriptName) throws Exception {
+        pb.command("vagrant", "ssh", node, "--", SCRIPT_HOME + scriptName);
+        Process p = pb.start();
+        p.waitFor();
+        InputStream input = p.getInputStream();
+        return IOUtils.toString(input, StandardCharsets.UTF_8.name());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml b/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
new file mode 100644
index 0000000..b9f4658
--- /dev/null
+++ b/asterix-installer/src/test/resources/clusterts/cluster_with_replication.xml
@@ -0,0 +1,63 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+
+    <name>vagrant</name>
+
+    <username>vagrant</username>
+
+    <working_dir>
+        <dir>/vagrant/asterix/managix-working</dir>
+        <NFS>true</NFS>
+    </working_dir>
+
+    <log_dir>/home/vagrant/asterix/logs/</log_dir>
+    <txn_log_dir>/home/vagrant/asterix/tx_logs</txn_log_dir>
+    <iodevices>/home/vagrant/asterix/p1,/home/vagrant/asterix/p2</iodevices>
+
+    <store>storage</store>
+
+    <java_home>/usr/java/latest</java_home>
+    <metadata_node>nc1</metadata_node>
+
+    <data_replication>
+        <enabled>true</enabled>
+        <replication_port>2000</replication_port>
+        <replication_factor>2</replication_factor>
+        <auto_failover>true</auto_failover>
+        <replication_time_out>10</replication_time_out>
+    </data_replication>
+
+    <master_node>
+        <id>cc</id>
+        <client_ip>10.10.0.2</client_ip>
+        <cluster_ip>10.10.0.2</cluster_ip>
+        <client_port>1098</client_port>
+        <cluster_port>1099</cluster_port>
+        <http_port>8888</http_port>
+    </master_node>
+    <node>
+        <id>nc1</id>
+        <cluster_ip>10.10.0.3</cluster_ip>
+    </node>
+    <node>
+        <id>nc2</id>
+        <cluster_ip>10.10.0.4</cluster_ip>
+    </node>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/clusterts/known_hosts
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/clusterts/known_hosts b/asterix-installer/src/test/resources/clusterts/known_hosts
index 5ab452a..273f9f3 100644
--- a/asterix-installer/src/test/resources/clusterts/known_hosts
+++ b/asterix-installer/src/test/resources/clusterts/known_hosts
@@ -1,6 +1,6 @@
-nc1,10.10.0.3 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-nc2,10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-nc3,10.10.0.5 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-127.0.0.1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
 ::1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
-10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA4DTPG2/P073Ak5htIlGYxFh9BYo3bwxBW/SZ1+MDtBUGSEFG0wP4dDRNxwzdmc2EB5JbiAb4t2wLtlFJUbhZUuhxxv2WP4Uastt+U2CWe8+/OsSCAieDv9+dvhT2YxkyAwSCFjyv9T+ftE7YyuIqgBoDTcsvCGzhcl80xd/mQuboneYdR8A0Q2cbd47BQ1D+76HW3l0t51N4fsvbds/LdkXtqVqCadiTKuZ/Ki9JkWlIwffF1IvtARdkW5hyA2fkdJcNBGcfIlxcEvXUvEcIi2FVdY+rJTR9014SrspPtfTyPe4mMgwNQGGT1dm9tnjhq/N6WrYOQ8J3HUn+r7tO5w==
+127.0.0.1 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+nc1,10.10.0.3 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+nc2,10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+10.10.0.4 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
+10.10.0.2 ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEA9gfVcSU968DdflnfJkup3em0MFX13uUTxsiaPisyahjcc++d+9yo+OL5Avffznal9Ev9hzptYclUw9Rx1fSs09g6w8qRiaRLHhYw3mgAer6vG9NZxaj2i2TAVUjdPKZDOSgtFzvPZiwWfhdqi6CmzMRi3MO5G3LzIRzB+qjMitgJ4p/R0mLTik40KTBQNxgN5EmegoANjyBDbdvBKf4vQvMwYGvByq4rMDyDCC8JgcR+B1kR8xNKwDYZFokuSrmAw6PXUmjyW6FaWaUFvrqaj64l1yHqLPcZDTbcSzum6+RaR5Gg2upfs3ahICOzzlnwnwtZxznWKU+rNd4T29cHOw==
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql
new file mode 100644
index 0000000..6a2441e
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.1.ddl.aql
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+    organization-name: string,
+    start-date: date,
+    end-date: date?
+}
+
+create type FacebookUserType as closed {
+    id: int,
+    alias: string,
+    name: string,
+    user-since: datetime,
+    friend-ids: {{ int32 }},
+    employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets  ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
new file mode 100644
index 0000000..ae14ad0
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.2.update.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql
new file mode 100644
index 0000000..9c8cb96
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.3.txnqbc.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsers return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.4.vagrant_script.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.5.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql
new file mode 100644
index 0000000..9c8cb96
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/bulkload/bulkload.6.txnqar.aql
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : bulkload.aql
+ * Description     : Check that Bulkload LSM component are replicated correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, query data, kill one node
+                     and wait until the failover complete, query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsers return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql
new file mode 100644
index 0000000..8de2067
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.1.ddl.aql
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+    organization-name: string,
+    start-date: date,
+    end-date: date?
+}
+
+create type FacebookUserType as closed {
+    id: int,
+    alias: string,
+    name: string,
+    user-since: datetime,
+    friend-ids: {{ int32 }},
+    employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets  ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
+
+create dataset FacebookUsersInMemory(FacebookUserType)
+primary key id;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
new file mode 100644
index 0000000..8087689
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.2.update.aql
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+use dataverse TinySocial;
+
+load dataset FacebookUsers using localfs
+(("path"="vagrant-ssh_nc1:///vagrant/data/fbu.adm"),("format"="adm"));
+
+insert into dataset TinySocial.FacebookUsersInMemory(for $x in dataset TinySocial.FacebookUsers return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql
new file mode 100644
index 0000000..e25e409
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.3.txnqbc.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
new file mode 100644
index 0000000..5695ed7
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.4.vagrant_script.aql
@@ -0,0 +1 @@
+nc2 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.5.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql
new file mode 100644
index 0000000..e25e409
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/mem_component_recovery/mem_component_recovery.6.txnqar.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : mem_component_recovery.aql
+ * Description     : Check that Memory LSM component are replicated and recovered correclty.
+                     The test goes as follows:
+                     start 2 nodes, bulkload a dataset, copy it to in-memory dataset, query
+                     data from memory, kill one node and wait until the failover complete,
+                     query the data again.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+count (for $x in dataset FacebookUsersInMemory return $x);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql
new file mode 100644
index 0000000..113d144
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.1.ddl.aql
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : metadata_node_recovery.aql
+ * Description     : Check that metadata node failover is done correctly.
+                     The test goes as follows:
+                     start 2 nodes, create a dataset, kill metadata node
+                     and wait until the failover complete, verify the
+                     dataset still exists.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+drop dataverse TinySocial if exists;
+create dataverse TinySocial;
+use dataverse TinySocial;
+
+create type EmploymentType as open {
+    organization-name: string,
+    start-date: date,
+    end-date: date?
+}
+
+create type FacebookUserType as closed {
+    id: int,
+    alias: string,
+    name: string,
+    user-since: datetime,
+    friend-ids: {{ int32 }},
+    employment: [EmploymentType]
+}
+
+/********* 2. Create Datasets  ***********/
+use dataverse TinySocial;
+
+drop dataset FacebookUsers if exists;
+
+create dataset FacebookUsers(FacebookUserType)
+primary key id;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql
new file mode 100644
index 0000000..76bdcfe
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.2.txnqbc.aql
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : metadata_node_recovery.aql
+ * Description     : Check that metadata node failover is done correctly.
+                     The test goes as follows:
+                     start 2 nodes, create a dataset, kill metadata node
+                     and wait until the failover complete, verify the
+                     dataset still exists.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+for $x in dataset Metadata.Dataset where $x.DatasetName ='FacebookUsers' return $x.DatasetName;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
new file mode 100644
index 0000000..5eec164
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.3.vagrant_script.aql
@@ -0,0 +1 @@
+nc1 kill_cc_and_nc.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql
new file mode 100644
index 0000000..51b0e76
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.4.sleep.aql
@@ -0,0 +1 @@
+60000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql
new file mode 100644
index 0000000..ac1c593
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/queries/failover/metadata_node/metadata_node.5.txnqar.aql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Test case Name  : metadata_node_recovery.aql
+ * Description     : Check that metadata node failover is done correctly.
+                     The test goes as follows:
+                     start 2 nodes, create a dataset, kill metadata node
+                     and wait until the failover complete, verify the
+                     dataset still exists.
+ * Expected Result : Success
+ * Date            : January 6 2016
+ */
+
+use dataverse TinySocial;
+
+for $x in dataset Metadata.Dataset where $x.DatasetName ='FacebookUsers' return $x.DatasetName;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
----------------------------------------------------------------------
diff --git a/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
new file mode 100644
index 0000000..f033086
--- /dev/null
+++ b/asterix-installer/src/test/resources/integrationts/replication/testsuite.xml
@@ -0,0 +1,37 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
+  <test-group name="failover">
+    <test-case FilePath="failover">
+      <compilation-unit name="bulkload">
+        <output-dir compare="Text">bulkload</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failover">
+      <compilation-unit name="mem_component_recovery">
+        <output-dir compare="Text">mem_component_recovery</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="failover">
+      <compilation-unit name="metadata_node">
+        <output-dir compare="Text">metadata_node</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index aa7f7d5..823e861 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -383,6 +383,12 @@ public class MetadataBootstrap {
             dataLifecycleManager.register(absolutePath, lsmBtree);
         } else {
             final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
+            if (resource == null) {
+                throw new Exception("Could not find required metadata indexes. Please delete "
+                        + propertiesProvider.getMetadataProperties().getTransactionLogDirs()
+                                .get(runtimeContext.getTransactionSubsystem().getId())
+                        + " to intialize as a new instance. (WARNING: all data will be lost.)");
+            }
             resourceID = resource.getResourceId();
             lsmBtree = (LSMBTree) dataLifecycleManager.getIndex(absolutePath);
             if (lsmBtree == null) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index b6f3c9e..601ce15 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -142,4 +142,10 @@ public class SplitsAndConstraintsUtil {
         FileSplit[] splits = splitsForFilesIndex(mdTxnCtx, dataverseName, datasetName, targetIdxName, create);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
+
+    public static String getIndexPath(String partitionPath, int partition, String dataverse, String fullIndexName) {
+        String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
+        return partitionPath + StoragePathUtil.prepareStoragePartitionPath(storageDirName, partition) + File.separator
+                + StoragePathUtil.prepareDataverseIndexName(dataverse, fullIndexName);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
index 866162b..082618b 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixAppContextInfo.java
@@ -20,10 +20,11 @@ package org.apache.asterix.om.util;
 
 import java.util.logging.Logger;
 
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixCompilerProperties;
 import org.apache.asterix.common.config.AsterixExternalProperties;
 import org.apache.asterix.common.config.AsterixFeedProperties;
-import org.apache.asterix.common.config.AsterixBuildProperties;
 import org.apache.asterix.common.config.AsterixMetadataProperties;
 import org.apache.asterix.common.config.AsterixPropertiesAccessor;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
@@ -56,12 +57,13 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
     private AsterixFeedProperties feedProperties;
     private AsterixBuildProperties buildProperties;
     private AsterixReplicationProperties replicationProperties;
-
+    private final IGlobalRecoveryMaanger globalRecoveryMaanger;
     private IHyracksClientConnection hcc;
 
-    public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc) throws AsterixException {
+    public static void initialize(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+            IGlobalRecoveryMaanger globalRecoveryMaanger) throws AsterixException {
         if (INSTANCE == null) {
-            INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc);
+            INSTANCE = new AsterixAppContextInfo(ccAppCtx, hcc, globalRecoveryMaanger);
         }
         AsterixPropertiesAccessor propertiesAccessor = new AsterixPropertiesAccessor();
         INSTANCE.compilerProperties = new AsterixCompilerProperties(propertiesAccessor);
@@ -77,9 +79,11 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
         Logger.getLogger("org.apache").setLevel(INSTANCE.externalProperties.getLogLevel());
     }
 
-    private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc) {
+    private AsterixAppContextInfo(ICCApplicationContext ccAppCtx, IHyracksClientConnection hcc,
+            IGlobalRecoveryMaanger globalRecoveryMaanger) {
         this.appCtx = ccAppCtx;
         this.hcc = hcc;
+        this.globalRecoveryMaanger = globalRecoveryMaanger;
     }
 
     public static AsterixAppContextInfo getInstance() {
@@ -144,4 +148,9 @@ public class AsterixAppContextInfo implements IAsterixApplicationContextInfo, IA
     public AsterixReplicationProperties getReplicationProperties() {
         return replicationProperties;
     }
+
+    @Override
+    public IGlobalRecoveryMaanger getGlobalRecoveryManager() {
+        return globalRecoveryMaanger;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
----------------------------------------------------------------------
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 80008c5..c2c28df 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -35,6 +35,12 @@ import javax.xml.bind.Unmarshaller;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -45,6 +51,11 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConst
  */
 
 public class AsterixClusterProperties {
+    /**
+     * TODO: currently after instance restarts we require all nodes to join again, otherwise the cluster wont be ACTIVE.
+     * we may overcome this by storing the cluster state before the instance shutdown and using it on startup to identify
+     * the nodes that are expected the join.
+     */
 
     private static final Logger LOGGER = Logger.getLogger(AsterixClusterProperties.class.getName());
 
@@ -63,6 +74,12 @@ public class AsterixClusterProperties {
 
     private Map<String, ClusterPartition[]> node2PartitionsMap = null;
     private SortedMap<Integer, ClusterPartition> clusterPartitions = null;
+    private Map<Long, TakeoverPartitionsRequestMessage> pendingTakeoverRequests = null;
+
+    private long takeoverRequestId = 0;
+    private String currentMetadataNode = null;
+    private boolean isMetadataNodeActive = false;
+    private boolean autoFailover = false;
 
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -82,6 +99,13 @@ public class AsterixClusterProperties {
             if (AsterixAppContextInfo.getInstance().getCCApplicationContext() != null) {
                 node2PartitionsMap = AsterixAppContextInfo.getInstance().getMetadataProperties().getNodePartitions();
                 clusterPartitions = AsterixAppContextInfo.getInstance().getMetadataProperties().getClusterPartitions();
+                currentMetadataNode = AsterixAppContextInfo.getInstance().getMetadataProperties().getMetadataNodeName();
+                if (isAutoFailoverEnabled()) {
+                    autoFailover = cluster.getDataReplication().isAutoFailover();
+                }
+                if (autoFailover) {
+                    pendingTakeoverRequests = new HashMap<>();
+                }
             }
         }
     }
@@ -91,18 +115,24 @@ public class AsterixClusterProperties {
     public synchronized void removeNCConfiguration(String nodeId) {
         updateNodePartitions(nodeId, false);
         ncConfiguration.remove(nodeId);
+        if (nodeId.equals(currentMetadataNode)) {
+            isMetadataNodeActive = false;
+            LOGGER.info("Metadata node is now inactive");
+        }
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" Removing configuration parameters for node id " + nodeId);
+            LOGGER.info("Removing configuration parameters for node id " + nodeId);
+        }
+        if (autoFailover) {
+            requestPartitionsTakeover(nodeId);
         }
-        // TODO implement fault tolerance as follows:
-        // 1. collect the partitions of the failed NC
-        // 2. For each partition, request a remote replica to take over.
-        // 3. wait until each remote replica completes the recovery for the lost partitions
-        // 4. update the cluster state
     }
 
     public synchronized void addNCConfiguration(String nodeId, Map<String, String> configuration) {
         ncConfiguration.put(nodeId, configuration);
+        if (nodeId.equals(currentMetadataNode)) {
+            isMetadataNodeActive = true;
+            LOGGER.info("Metadata node is now active");
+        }
         updateNodePartitions(nodeId, true);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info(" Registering configuration parameters for node id " + nodeId);
@@ -118,8 +148,6 @@ public class AsterixClusterProperties {
                 p.setActive(added);
                 if (added) {
                     p.setActiveNodeId(nodeId);
-                } else {
-                    p.setActiveNodeId(null);
                 }
             }
             resetClusterPartitionConstraint();
@@ -135,13 +163,20 @@ public class AsterixClusterProperties {
                 return;
             }
         }
-        // if all storage partitions are active, then the cluster is active
-        state = ClusterState.ACTIVE;
-        LOGGER.info("Cluster is now ACTIVE");
+        //if all storage partitions are active as well as the metadata node, then the cluster is active
+        if (isMetadataNodeActive) {
+            state = ClusterState.ACTIVE;
+            LOGGER.info("Cluster is now ACTIVE");
+            //start global recovery
+            AsterixAppContextInfo.getInstance().getGlobalRecoveryManager().startGlobalRecovery();
+        } else {
+            requestMetadataNodeTakeover();
+        }
     }
 
     /**
      * Returns the number of IO devices configured for a Node Controller
+     *
      * @param nodeId
      *            unique identifier of the Node Controller
      * @return number of IO devices. -1 if the node id is not valid. A node id
@@ -155,6 +190,7 @@ public class AsterixClusterProperties {
 
     /**
      * Returns the IO devices configured for a Node Controller
+     *
      * @param nodeId
      *            unique identifier of the Node Controller
      * @return a list of IO devices. null if node id is not valid. A node id is not valid
@@ -257,4 +293,133 @@ public class AsterixClusterProperties {
         // virtual cluster without cluster config file
         return DEFAULT_STORAGE_DIR_NAME;
     }
-}
+
+    private synchronized void requestPartitionsTakeover(String failedNodeId) {
+        //replica -> list of partitions to takeover
+        Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
+        AsterixReplicationProperties replicationProperties = AsterixAppContextInfo.getInstance()
+                .getReplicationProperties();
+
+        //collect the partitions of the failed NC
+        List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId);
+        for (ClusterPartition partition : lostPartitions) {
+            //find replicas for this partitions
+            Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId());
+            //find a replica that is still active
+            for (String replica : partitionReplicas) {
+                //TODO (mhubail) currently this assigns the partition to the first found active replica.
+                //It needs to be modified to consider load balancing.
+                if (ncConfiguration.containsKey(replica)) {
+                    if (!partitionRecoveryPlan.containsKey(replica)) {
+                        List<Integer> replicaPartitions = new ArrayList<>();
+                        replicaPartitions.add(partition.getPartitionId());
+                        partitionRecoveryPlan.put(replica, replicaPartitions);
+                    } else {
+                        partitionRecoveryPlan.get(replica).add(partition.getPartitionId());
+                    }
+                }
+            }
+        }
+
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                .getCCApplicationContext().getMessageBroker();
+        //For each replica, send a request to takeover the assigned partitions
+        for (String replica : partitionRecoveryPlan.keySet()) {
+            Integer[] partitionsToTakeover = partitionRecoveryPlan.get(replica).toArray(new Integer[] {});
+            long requestId = takeoverRequestId++;
+            TakeoverPartitionsRequestMessage takeoverRequest = new TakeoverPartitionsRequestMessage(requestId, replica,
+                    failedNodeId, partitionsToTakeover);
+            pendingTakeoverRequests.put(requestId, takeoverRequest);
+            try {
+                messageBroker.sendApplicationMessageToNC(takeoverRequest, replica);
+            } catch (Exception e) {
+                /**
+                 * if we fail to send the request, it means the NC we tried to send the request to
+                 * has failed. When the failure notification arrives, we will send any pending request
+                 * that belongs to the failed NC to a different active replica.
+                 */
+                LOGGER.warning("Failed to send takeover request: " + takeoverRequest);
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private synchronized List<ClusterPartition> getNodeAssignedPartitions(String nodeId) {
+        List<ClusterPartition> nodePartitions = new ArrayList<>();
+        for (ClusterPartition partition : clusterPartitions.values()) {
+            if (partition.getActiveNodeId().equals(nodeId)) {
+                nodePartitions.add(partition);
+            }
+        }
+        /**
+         * if there is any pending takeover request that this node was supposed to handle,
+         * it needs to be sent to a different replica
+         */
+        List<Long> failedTakeoverRequests = new ArrayList<>();
+        for (TakeoverPartitionsRequestMessage request : pendingTakeoverRequests.values()) {
+            if (request.getNodeId().equals(nodeId)) {
+                for (Integer partitionId : request.getPartitions()) {
+                    nodePartitions.add(clusterPartitions.get(partitionId));
+                }
+                failedTakeoverRequests.add(request.getId());
+            }
+        }
+
+        //remove failed requests
+        for (Long requestId : failedTakeoverRequests) {
+            pendingTakeoverRequests.remove(requestId);
+        }
+
+        return nodePartitions;
+    }
+
+    private synchronized void requestMetadataNodeTakeover() {
+        //need a new node to takeover metadata node
+        ClusterPartition metadataPartiton = AsterixAppContextInfo.getInstance().getMetadataProperties()
+                .getMetadataPartition();
+        //request the metadataPartition node to register itself as the metadata node
+        TakeoverMetadataNodeRequestMessage takeoverRequest = new TakeoverMetadataNodeRequestMessage();
+        ICCMessageBroker messageBroker = (ICCMessageBroker) AsterixAppContextInfo.getInstance()
+                .getCCApplicationContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(takeoverRequest, metadataPartiton.getActiveNodeId());
+        } catch (Exception e) {
+            /**
+             * if we fail to send the request, it means the NC we tried to send the request to
+             * has failed. When the failure notification arrives, a new NC will be assigned to
+             * the metadata partition and a new metadata node takeover request will be sent to it.
+             */
+            LOGGER.warning("Failed to send metadata node takeover request to: " + metadataPartiton.getActiveNodeId());
+            e.printStackTrace();
+        }
+    }
+
+    public synchronized void processPartitionTakeoverResponse(TakeoverPartitionsResponseMessage reponse) {
+        for (Integer partitonId : reponse.getPartitions()) {
+            ClusterPartition partition = clusterPartitions.get(partitonId);
+            partition.setActive(true);
+            partition.setActiveNodeId(reponse.getNodeId());
+        }
+        pendingTakeoverRequests.remove(reponse.getRequestId());
+        resetClusterPartitionConstraint();
+        updateClusterState();
+    }
+
+    public synchronized void processMetadataNodeTakeoverResponse(TakeoverMetadataNodeResponseMessage reponse) {
+        currentMetadataNode = reponse.getNodeId();
+        isMetadataNodeActive = true;
+        LOGGER.info("Current metadata node: " + currentMetadataNode);
+        updateClusterState();
+    }
+
+    public synchronized String getCurrentMetadataNode() {
+        return currentMetadataNode;
+    }
+
+    public boolean isAutoFailoverEnabled() {
+        if (cluster != null && cluster.getDataReplication() != null && cluster.getDataReplication().isEnabled()) {
+            return cluster.getDataReplication().isAutoFailover();
+        }
+        return false;
+    }
+}
\ No newline at end of file


[4/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance

Posted by mh...@apache.org.
Asterix NCs Fault Tolerance

This change includes the following:
- Adapt replication to unique partitions storage.
- Implement auto failover for failing NCs.
- Implement auto failover for metadata node.
- Fix for ASTERIXDB-1251 using proper error message.
- Basic replication test cases using vagrant virtual cluster for:
   1. LSM bulkload components replication.
   2. LSM Memory components replication and recovery.
   3. Metadata node takeover.
These test cases will be part of the cluster test profile.

Change-Id: Ice26d980912a315fcb3efdd571d6ce88717cfea4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/573
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/8fc8bf8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/8fc8bf8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/8fc8bf8b

Branch: refs/heads/master
Commit: 8fc8bf8b510bdc635f949f2eebf8b4d0d2a6b008
Parents: 5b068d2
Author: Murtadha Hubail <mh...@uci.edu>
Authored: Sat Jan 23 22:26:59 2016 -0800
Committer: Murtadha Hubail <hu...@gmail.com>
Committed: Tue Jan 26 15:26:32 2016 -0800

----------------------------------------------------------------------
 .../api/common/AsterixAppRuntimeContext.java    |  59 +++-
 .../bootstrap/AsterixGlobalRecoveryManager.java | 217 ------------
 .../bootstrap/CCApplicationEntryPoint.java      |  12 +-
 .../bootstrap/ClusterLifecycleListener.java     |  18 +-
 .../bootstrap/GlobalRecoveryManager.java        | 225 ++++++++++++
 .../bootstrap/NCApplicationEntryPoint.java      |  36 +-
 .../asterix/messaging/CCMessageBroker.java      |  26 +-
 .../asterix/messaging/NCMessageBroker.java      |  33 ++
 .../common/api/IAsterixAppRuntimeContext.java   |  14 +
 .../common/cluster/IGlobalRecoveryMaanger.java  |  29 ++
 .../config/AsterixMetadataProperties.java       |   4 +
 .../config/AsterixReplicationProperties.java    |  13 +-
 .../IAsterixApplicationContextInfo.java         |   3 +
 .../TakeoverMetadataNodeRequestMessage.java     |  29 ++
 .../TakeoverMetadataNodeResponseMessage.java    |  38 ++
 .../TakeoverPartitionsRequestMessage.java       |  72 ++++
 .../TakeoverPartitionsResponseMessage.java      |  50 +++
 .../messaging/api/IApplicationMessage.java      |   6 +-
 .../common/messaging/api/ICCMessageBroker.java  |  32 ++
 .../replication/IRemoteRecoveryManager.java     |  16 +
 .../replication/IReplicaResourcesManager.java   |  13 +-
 .../common/transactions/IRecoveryManager.java   |  10 +
 .../asterix/common/utils/StoragePathUtil.java   |  10 +-
 .../src/main/resources/schema/cluster.xsd       |   4 +-
 .../apache/asterix/test/aql/TestExecutor.java   |  27 ++
 .../asterix/event/util/PatternCreator.java      |  29 --
 asterix-installer/pom.xml                       |   4 +
 .../installer/command/ValidateCommand.java      |   6 -
 .../asterix/installer/test/ReplicationIT.java   | 257 ++++++++++++++
 .../clusterts/cluster_with_replication.xml      |  63 ++++
 .../src/test/resources/clusterts/known_hosts    |  10 +-
 .../failover/bulkload/bulkload.1.ddl.aql        |  54 +++
 .../failover/bulkload/bulkload.2.update.aql     |  32 ++
 .../failover/bulkload/bulkload.3.txnqbc.aql     |  31 ++
 .../bulkload/bulkload.4.vagrant_script.aql      |   1 +
 .../failover/bulkload/bulkload.5.sleep.aql      |   1 +
 .../failover/bulkload/bulkload.6.txnqar.aql     |  31 ++
 .../mem_component_recovery.1.ddl.aql            |  58 ++++
 .../mem_component_recovery.2.update.aql         |  34 ++
 .../mem_component_recovery.3.txnqbc.aql         |  32 ++
 .../mem_component_recovery.4.vagrant_script.aql |   1 +
 .../mem_component_recovery.5.sleep.aql          |   1 +
 .../mem_component_recovery.6.txnqar.aql         |  32 ++
 .../metadata_node/metadata_node.1.ddl.aql       |  55 +++
 .../metadata_node/metadata_node.2.txnqbc.aql    |  30 ++
 .../metadata_node.3.vagrant_script.aql          |   1 +
 .../metadata_node/metadata_node.4.sleep.aql     |   1 +
 .../metadata_node/metadata_node.5.txnqar.aql    |  32 ++
 .../integrationts/replication/testsuite.xml     |  37 ++
 .../metadata/bootstrap/MetadataBootstrap.java   |   6 +
 .../utils/SplitsAndConstraintsUtil.java         |   6 +
 .../asterix/om/util/AsterixAppContextInfo.java  |  19 +-
 .../om/util/AsterixClusterProperties.java       | 189 +++++++++-
 .../functions/AsterixReplicationProtocol.java   | 346 -------------------
 .../functions/ReplicationProtocol.java          | 346 +++++++++++++++++++
 .../logging/ReplicationLogBuffer.java           |   4 +-
 .../management/ReplicaEventNotifier.java        |   6 +-
 .../management/ReplicaStateChecker.java         |   4 +-
 .../management/ReplicationChannel.java          | 123 +++----
 .../management/ReplicationManager.java          |  92 +++--
 .../recovery/RemoteRecoveryManager.java         |  31 +-
 .../replication/storage/AsterixFilesUtil.java   |  78 -----
 .../storage/AsterixLSMIndexFileProperties.java  | 191 ----------
 .../storage/LSMComponentProperties.java         |  33 +-
 .../storage/LSMIndexFileProperties.java         | 162 +++++++++
 .../storage/ReplicaResourcesManager.java        | 331 ++++++++----------
 .../src/test/resources/data/fbu.adm             |  10 +
 .../test/resources/scripts/delete_storage.sh    |  18 +
 .../test/resources/scripts/kill_cc_and_nc.sh    |  18 +
 .../PersistentLocalResourceRepository.java      |  73 ++--
 ...ersistentLocalResourceRepositoryFactory.java |   8 +-
 .../service/recovery/RecoveryManager.java       | 273 +++++++++++++--
 72 files changed, 2814 insertions(+), 1382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
index 6d0b321..8a40876 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixAppRuntimeContext.java
@@ -19,7 +19,10 @@
 package org.apache.asterix.api.common;
 
 import java.io.IOException;
+import java.rmi.RemoteException;
+import java.rmi.server.UnicastRemoteObject;
 import java.util.List;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.AsterixThreadExecutor;
@@ -46,9 +49,14 @@ import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.external.feed.api.IFeedManager;
 import org.apache.asterix.external.feed.management.FeedManager;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.IAsterixStateProxy;
+import org.apache.asterix.metadata.api.IMetadataNode;
+import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.replication.management.ReplicationChannel;
@@ -85,6 +93,7 @@ import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAsterixPropertiesProvider {
+    private static final Logger LOGGER = Logger.getLogger(AsterixAppRuntimeContext.class.getName());
 
     private static final AsterixPropertiesAccessor ASTERIX_PROPERTIES_ACCESSOR;
 
@@ -128,8 +137,9 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     private IReplicationManager replicationManager;
     private IRemoteRecoveryManager remoteRecoveryManager;
     private IReplicaResourcesManager replicaResourcesManager;
+    private final int metadataRmiPort;
 
-    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext) {
+    public AsterixAppRuntimeContext(INCApplicationContext ncApplicationContext, int metadataRmiPort) {
         this.ncApplicationContext = ncApplicationContext;
         compilerProperties = new AsterixCompilerProperties(ASTERIX_PROPERTIES_ACCESSOR);
         externalProperties = new AsterixExternalProperties(ASTERIX_PROPERTIES_ACCESSOR);
@@ -140,6 +150,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         buildProperties = new AsterixBuildProperties(ASTERIX_PROPERTIES_ACCESSOR);
         replicationProperties = new AsterixReplicationProperties(ASTERIX_PROPERTIES_ACCESSOR,
                 AsterixClusterProperties.INSTANCE.getCluster());
+        this.metadataRmiPort = metadataRmiPort;
     }
 
     public void initialize(boolean initialRun) throws IOException, ACIDException, AsterixException {
@@ -159,7 +170,8 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         metadataMergePolicyFactory = new PrefixMergePolicyFactory();
 
         ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(
-                ioManager, ncApplicationContext.getNodeId());
+                ioManager, ncApplicationContext.getNodeId(), metadataProperties);
+
         localResourceRepository = persistentLocalResourceRepositoryFactory.createRepository();
 
         IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AsterixAppRuntimeContextProdiverForRecovery(
@@ -186,9 +198,7 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
         if (replicationProperties.isReplicationEnabled()) {
             String nodeId = ncApplicationContext.getNodeId();
 
-            replicaResourcesManager = new ReplicaResourcesManager(ioManager.getIODevices(),
-                    AsterixClusterProperties.INSTANCE.getStorageDirectoryName(), nodeId,
-                    replicationProperties.getReplicationStore());
+            replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties);
 
             replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager,
                     txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider);
@@ -225,14 +235,14 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
          * to process any logs that might be generated during stopping these components
          */
         lccm.register((ILifeCycleComponent) txnSubsystem.getLogManager());
-        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
         /**
-         * ReplicationManager must be stopped after indexLifecycleManager so that any logs/files generated
-         * during closing datasets are sent to remote replicas
+         * ReplicationManager must be stopped after indexLifecycleManager and recovery manager
+         * so that any logs/files generated during closing datasets or checkpoints are sent to remote replicas
          */
         if (replicationManager != null) {
             lccm.register(replicationManager);
         }
+        lccm.register((ILifeCycleComponent) txnSubsystem.getRecoveryManager());
         /**
          * Stopping indexLifecycleManager will flush and close all datasets.
          */
@@ -380,4 +390,35 @@ public class AsterixAppRuntimeContext implements IAsterixAppRuntimeContext, IAst
     public void initializeResourceIdFactory() throws HyracksDataException {
         resourceIdFactory = new GlobalResourceIdFactoryProvider(ncApplicationContext).createResourceIdFactory();
     }
+
+    @Override
+    public void initializeMetadata(boolean newUniverse) throws Exception {
+        IAsterixStateProxy proxy = null;
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Bootstrapping metadata");
+        }
+        MetadataNode.INSTANCE.initialize(this);
+
+        proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
+        if (proxy == null) {
+            throw new IllegalStateException("Metadata node cannot access distributed state");
+        }
+
+        // This is a special case, we just give the metadataNode directly.
+        // This way we can delay the registration of the metadataNode until
+        // it is completely initialized.
+        MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
+        MetadataBootstrap.startUniverse(this, ncApplicationContext, newUniverse);
+        MetadataBootstrap.startDDLRecovery();
+
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Metadata node bound");
+        }
+    }
+
+    @Override
+    public void exportMetadataNodeStub() throws RemoteException {
+        IMetadataNode stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
+        ((IAsterixStateProxy) ncApplicationContext.getDistributedState()).setMetadataNode(stub);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
deleted file mode 100644
index 4fae7e9..0000000
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.hyracks.bootstrap;
-
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.api.IClusterManagementWorkResponse;
-import org.apache.asterix.common.config.MetadataConstants;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
-import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.external.indexing.ExternalFile;
-import org.apache.asterix.feed.CentralFeedManager;
-import org.apache.asterix.file.ExternalIndexingOperations;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataTransactionContext;
-import org.apache.asterix.metadata.declared.AqlMetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
-import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.om.util.AsterixClusterProperties;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber {
-
-    private static ClusterState state;
-    private static final Logger LOGGER = Logger.getLogger(AsterixGlobalRecoveryManager.class.getName());
-    private HyracksConnection hcc;
-    public static AsterixGlobalRecoveryManager INSTANCE;
-
-    public AsterixGlobalRecoveryManager(HyracksConnection hcc) throws Exception {
-        state = AsterixClusterProperties.INSTANCE.getState();
-        this.hcc = hcc;
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
-        state = AsterixClusterProperties.INSTANCE.getState();
-        AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
-        return null;
-    }
-
-    @Override
-    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
-        // perform global recovery if state changed to active
-        final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
-        boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
-        if (needToRecover) {
-            Thread recoveryThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
-                    LOGGER.info("Starting AsterixDB's Global Recovery");
-                    MetadataTransactionContext mdTxnCtx = null;
-                    try {
-                        Thread.sleep(4000);
-                        MetadataManager.INSTANCE.init();
-                        // Loop over datasets
-                        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                        List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
-                        for (Dataverse dataverse : dataverses) {
-                            if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
-                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse, CentralFeedManager.getInstance());
-                                List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
-                                        dataverse.getDataverseName());
-                                for (Dataset dataset : datasets) {
-                                    if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
-                                        // External dataset
-                                        // Get indexes
-                                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
-                                                dataset.getDataverseName(), dataset.getDatasetName());
-                                        if (indexes.size() > 0) {
-                                            // Get the state of the dataset
-                                            ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
-                                                    .getDatasetDetails();
-                                            ExternalDatasetTransactionState datasetState = dsd.getState();
-                                            if (datasetState == ExternalDatasetTransactionState.BEGIN) {
-                                                List<ExternalFile> files = MetadataManager.INSTANCE
-                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
-                                                // if persumed abort, roll backward
-                                                // 1. delete all pending files
-                                                for (ExternalFile file : files) {
-                                                    if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
-                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                                                    }
-                                                }
-                                                // 2. clean artifacts in NCs
-                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations.buildAbortOp(
-                                                        dataset, indexes, metadataProvider);
-                                                executeHyracksJob(jobSpec);
-                                                // 3. correct the dataset state
-                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
-                                                        .setState(ExternalDatasetTransactionState.COMMIT);
-                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
-                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                                            } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) {
-                                                List<ExternalFile> files = MetadataManager.INSTANCE
-                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
-                                                // if ready to commit, roll forward
-                                                // 1. commit indexes in NCs
-                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                                                JobSpecification jobSpec = ExternalIndexingOperations.buildRecoverOp(
-                                                        dataset, indexes, metadataProvider);
-                                                executeHyracksJob(jobSpec);
-                                                // 2. add pending files in metadata
-                                                for (ExternalFile file : files) {
-                                                    if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) {
-                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
-                                                        file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
-                                                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
-                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
-                                                        // find original file
-                                                        for (ExternalFile originalFile : files) {
-                                                            if (originalFile.getFileName().equals(file.getFileName())) {
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        file);
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                                break;
-                                                            }
-                                                        }
-                                                    } else if (file.getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
-                                                        // find original file
-                                                        for (ExternalFile originalFile : files) {
-                                                            if (originalFile.getFileName().equals(file.getFileName())) {
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        file);
-                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                                originalFile.setSize(file.getSize());
-                                                                MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
-                                                                        originalFile);
-                                                            }
-                                                        }
-                                                    }
-                                                }
-                                                // 3. correct the dataset state
-                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
-                                                        .setState(ExternalDatasetTransactionState.COMMIT);
-                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
-                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-                    } catch (Exception e) {
-                        // This needs to be fixed <-- Needs to shutdown the system -->
-                        /*
-                         * Note: Throwing this illegal state exception will terminate this thread
-                         * and feeds listeners will not be notified.
-                         */
-                        LOGGER.severe("Global recovery was not completed successfully" + e);
-                        try {
-                            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
-                        } catch (Exception e1) {
-                            if (LOGGER.isLoggable(Level.SEVERE)) {
-                                LOGGER.severe("Exception in aborting" + e.getMessage());
-                            }
-                            throw new IllegalStateException(e1);
-                        }
-                    }
-                    AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(true);
-                    LOGGER.info("Global Recovery Completed");
-                }
-            });
-            state = newState;
-            recoveryThread.start();
-        }
-        return null;
-    }
-
-    private void executeHyracksJob(JobSpecification spec) throws Exception {
-        spec.setMaxReattempts(0);
-        JobId jobId = hcc.startJob(spec);
-        hcc.waitForCompletion(jobId);
-    }
-
-    @Override
-    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
-        // Do nothing
-    }
-
-    @Override
-    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
-        // Do nothing?
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index 2a7b3e4..f2fc9bf 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -79,7 +79,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
 
     @Override
     public void start(ICCApplicationContext ccAppCtx, String[] args) throws Exception {
-        messageBroker = new CCMessageBroker((ClusterControllerService)ccAppCtx.getControllerService());
+        messageBroker = new CCMessageBroker((ClusterControllerService) ccAppCtx.getControllerService());
         this.appCtx = ccAppCtx;
 
         if (LOGGER.isLoggable(Level.INFO)) {
@@ -87,7 +87,11 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         }
 
         appCtx.setThreadFactory(new AsterixThreadFactory(new LifeCycleComponentManager()));
-        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection());
+        GlobalRecoveryManager.INSTANCE = new GlobalRecoveryManager(
+                (HyracksConnection) getNewHyracksClientConnection());
+
+        AsterixAppContextInfo.initialize(appCtx, getNewHyracksClientConnection(),
+                GlobalRecoveryManager.INSTANCE);
 
         proxy = AsterixStateProxy.registerRemoteObject();
         appCtx.setDistributedState(proxy);
@@ -111,9 +115,7 @@ public class CCApplicationEntryPoint implements ICCApplicationEntryPoint {
         centralFeedManager = CentralFeedManager.getInstance();
         centralFeedManager.start();
 
-        AsterixGlobalRecoveryManager.INSTANCE = new AsterixGlobalRecoveryManager(
-                (HyracksConnection) getNewHyracksClientConnection());
-        ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE);
+        ClusterManager.INSTANCE.registerSubscriber(GlobalRecoveryManager.INSTANCE);
 
         AsterixReplicationProperties asterixRepliactionProperties = AsterixAppContextInfo.getInstance()
                 .getReplicationProperties();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 9505692..00b7391 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -95,19 +95,12 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info("NC: " + deadNode + " left");
             }
-            AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
-
             //if metadata node failed, we need to rebind the proxy connection when it joins again.
-            //Note: the format for the NC should be (INSTANCE-NAME)_(NC-ID)
-            if (AsterixClusterProperties.INSTANCE.getCluster() != null) {
-                String instanceName = AsterixClusterProperties.INSTANCE.getCluster().getInstanceName();
-                String metadataNodeName = AsterixClusterProperties.INSTANCE.getCluster().getMetadataNode();
-                String completeMetadataNodeName = instanceName + "_" + metadataNodeName;
-                if (deadNode.equals(completeMetadataNodeName)) {
-                    MetadataManager.INSTANCE.rebindMetadataNode = true;
-                }
+            String metadataNode = AsterixClusterProperties.INSTANCE.getCurrentMetadataNode();
+            if (deadNode.equals(metadataNode)) {
+                MetadataManager.INSTANCE.rebindMetadataNode = true;
             }
-
+            AsterixClusterProperties.INSTANCE.removeNCConfiguration(deadNode);
         }
         updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
         Set<IClusterEventsSubscriber> subscribers = ClusterManager.INSTANCE.getRegisteredClusterEventSubscribers();
@@ -166,7 +159,8 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener {
                 case REMOVE_NODE:
                     nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved());
                     nodeRemovalRequests.add(w);
-                    RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS);
+                    RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w,
+                            Status.IN_PROGRESS);
                     pendingWorkResponses.add(response);
                     break;
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
new file mode 100644
index 0000000..2bac1cf
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.hyracks.bootstrap;
+
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IClusterManagementWorkResponse;
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState;
+import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.config.MetadataConstants;
+import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.feed.CentralFeedManager;
+import org.apache.asterix.file.ExternalIndexingOperations;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.AqlMetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.om.util.AsterixClusterProperties;
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class GlobalRecoveryManager implements IGlobalRecoveryMaanger {
+
+    private static ClusterState state;
+    private static final Logger LOGGER = Logger.getLogger(GlobalRecoveryManager.class.getName());
+    private HyracksConnection hcc;
+    public static GlobalRecoveryManager INSTANCE;
+
+    public GlobalRecoveryManager(HyracksConnection hcc) throws Exception {
+        state = ClusterState.UNUSABLE;
+        this.hcc = hcc;
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) {
+        state = AsterixClusterProperties.INSTANCE.getState();
+        AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false);
+        return null;
+    }
+
+    @Override
+    public Set<IClusterManagementWork> notifyNodeJoin(String joinedNodeId) {
+        startGlobalRecovery();
+        return null;
+    }
+
+    private void executeHyracksJob(JobSpecification spec) throws Exception {
+        spec.setMaxReattempts(0);
+        JobId jobId = hcc.startJob(spec);
+        hcc.waitForCompletion(jobId);
+    }
+
+    @Override
+    public void notifyRequestCompletion(IClusterManagementWorkResponse response) {
+        // Do nothing
+    }
+
+    @Override
+    public void notifyStateChange(ClusterState previousState, ClusterState newState) {
+        // Do nothing?
+    }
+
+    @Override
+    public void startGlobalRecovery() {
+        // perform global recovery if state changed to active
+        final ClusterState newState = AsterixClusterProperties.INSTANCE.getState();
+        boolean needToRecover = !newState.equals(state) && (newState == ClusterState.ACTIVE);
+        if (needToRecover) {
+            Thread recoveryThread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    LOGGER.info("Starting AsterixDB's Global Recovery");
+                    MetadataTransactionContext mdTxnCtx = null;
+                    try {
+                        Thread.sleep(4000);
+                        MetadataManager.INSTANCE.init();
+                        // Loop over datasets
+                        mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                        List<Dataverse> dataverses = MetadataManager.INSTANCE.getDataverses(mdTxnCtx);
+                        for (Dataverse dataverse : dataverses) {
+                            if (!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME)) {
+                                AqlMetadataProvider metadataProvider = new AqlMetadataProvider(dataverse,
+                                        CentralFeedManager.getInstance());
+                                List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                                        dataverse.getDataverseName());
+                                for (Dataset dataset : datasets) {
+                                    if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
+                                        // External dataset
+                                        // Get indexes
+                                        List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+                                                dataset.getDataverseName(), dataset.getDatasetName());
+                                        if (indexes.size() > 0) {
+                                            // Get the state of the dataset
+                                            ExternalDatasetDetails dsd = (ExternalDatasetDetails) dataset
+                                                    .getDatasetDetails();
+                                            ExternalDatasetTransactionState datasetState = dsd.getState();
+                                            if (datasetState == ExternalDatasetTransactionState.BEGIN) {
+                                                List<ExternalFile> files = MetadataManager.INSTANCE
+                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                // if persumed abort, roll backward
+                                                // 1. delete all pending files
+                                                for (ExternalFile file : files) {
+                                                    if (file.getPendingOp() != ExternalFilePendingOp.PENDING_NO_OP) {
+                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+                                                    }
+                                                }
+                                                // 2. clean artifacts in NCs
+                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                JobSpecification jobSpec = ExternalIndexingOperations
+                                                        .buildAbortOp(dataset, indexes, metadataProvider);
+                                                executeHyracksJob(jobSpec);
+                                                // 3. correct the dataset state
+                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                        .setState(ExternalDatasetTransactionState.COMMIT);
+                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                                            } else if (datasetState == ExternalDatasetTransactionState.READY_TO_COMMIT) {
+                                                List<ExternalFile> files = MetadataManager.INSTANCE
+                                                        .getDatasetExternalFiles(mdTxnCtx, dataset);
+                                                // if ready to commit, roll forward
+                                                // 1. commit indexes in NCs
+                                                metadataProvider.setMetadataTxnContext(mdTxnCtx);
+                                                JobSpecification jobSpec = ExternalIndexingOperations
+                                                        .buildRecoverOp(dataset, indexes, metadataProvider);
+                                                executeHyracksJob(jobSpec);
+                                                // 2. add pending files in metadata
+                                                for (ExternalFile file : files) {
+                                                    if (file.getPendingOp() == ExternalFilePendingOp.PENDING_ADD_OP) {
+                                                        MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx, file);
+                                                        file.setPendingOp(ExternalFilePendingOp.PENDING_NO_OP);
+                                                        MetadataManager.INSTANCE.addExternalFile(mdTxnCtx, file);
+                                                    } else if (file
+                                                            .getPendingOp() == ExternalFilePendingOp.PENDING_DROP_OP) {
+                                                        // find original file
+                                                        for (ExternalFile originalFile : files) {
+                                                            if (originalFile.getFileName().equals(file.getFileName())) {
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        file);
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                                break;
+                                                            }
+                                                        }
+                                                    } else if (file
+                                                            .getPendingOp() == ExternalFilePendingOp.PENDING_APPEND_OP) {
+                                                        // find original file
+                                                        for (ExternalFile originalFile : files) {
+                                                            if (originalFile.getFileName().equals(file.getFileName())) {
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        file);
+                                                                MetadataManager.INSTANCE.dropExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                                originalFile.setSize(file.getSize());
+                                                                MetadataManager.INSTANCE.addExternalFile(mdTxnCtx,
+                                                                        originalFile);
+                                                            }
+                                                        }
+                                                    }
+                                                }
+                                                // 3. correct the dataset state
+                                                ((ExternalDatasetDetails) dataset.getDatasetDetails())
+                                                        .setState(ExternalDatasetTransactionState.COMMIT);
+                                                MetadataManager.INSTANCE.updateDataset(mdTxnCtx, dataset);
+                                                MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                                                mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+                    } catch (Exception e) {
+                        // This needs to be fixed <-- Needs to shutdown the system -->
+                        /*
+                         * Note: Throwing this illegal state exception will terminate this thread
+                         * and feeds listeners will not be notified.
+                         */
+                        LOGGER.severe("Global recovery was not completed successfully" + e);
+                        try {
+                            MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
+                        } catch (Exception e1) {
+                            if (LOGGER.isLoggable(Level.SEVERE)) {
+                                LOGGER.severe("Exception in aborting" + e.getMessage());
+                            }
+                            throw new IllegalStateException(e1);
+                        }
+                    }
+                    AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(true);
+                    LOGGER.info("Global Recovery Completed");
+                }
+            });
+            state = newState;
+            recoveryThread.start();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 3341387..dac9af5 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -20,7 +20,6 @@ package org.apache.asterix.hyracks.bootstrap;
 
 import java.io.File;
 import java.io.IOException;
-import java.rmi.server.UnicastRemoteObject;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,10 +41,6 @@ import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.messaging.NCMessageBroker;
-import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.metadata.MetadataNode;
-import org.apache.asterix.metadata.api.IAsterixStateProxy;
-import org.apache.asterix.metadata.api.IMetadataNode;
 import org.apache.asterix.metadata.bootstrap.MetadataBootstrap;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
@@ -102,7 +97,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             LOGGER.info("Starting Asterix node controller: " + nodeId);
         }
 
-        runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext);
+        runtimeContext = new AsterixAppRuntimeContext(ncApplicationContext, metadataRmiPort);
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
                 .getMetadataProperties();
         if (!metadataProperties.getNodeNames().contains(ncApplicationContext.getNodeId())) {
@@ -215,33 +210,12 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
             localResourceRepository.initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
-        IAsterixStateProxy proxy = null;
         isMetadataNode = nodeId.equals(metadataProperties.getMetadataNodeName());
         if (isMetadataNode) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Bootstrapping metadata");
-            }
-            MetadataNode.INSTANCE.initialize(runtimeContext);
-
-            proxy = (IAsterixStateProxy) ncApplicationContext.getDistributedState();
-            if (proxy == null) {
-                throw new IllegalStateException("Metadata node cannot access distributed state");
-            }
-
-            //This is a special case, we just give the metadataNode directly.
-            //This way we can delay the registration of the metadataNode until
-            //it is completely initialized.
-            MetadataManager.INSTANCE = new MetadataManager(proxy, MetadataNode.INSTANCE);
-            MetadataBootstrap.startUniverse(((IAsterixPropertiesProvider) runtimeContext), ncApplicationContext,
-                    systemState == SystemState.NEW_UNIVERSE);
-            MetadataBootstrap.startDDLRecovery();
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Metadata node bound");
-            }
+            runtimeContext.initializeMetadata(systemState == SystemState.NEW_UNIVERSE);
         }
-
         ExternalLibraryBootstrap.setUpExternaLibraries(isMetadataNode);
+
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting lifecycle components");
         }
@@ -267,9 +241,7 @@ public class NCApplicationEntryPoint implements INCApplicationEntryPoint {
         recoveryMgr.checkpoint(true, RecoveryManager.NON_SHARP_CHECKPOINT_TARGET_LSN);
 
         if (isMetadataNode) {
-            IMetadataNode stub = null;
-            stub = (IMetadataNode) UnicastRemoteObject.exportObject(MetadataNode.INSTANCE, metadataRmiPort);
-            proxy.setMetadataNode(stub);
+            runtimeContext.exportMetadataNodeStub();
         }
 
         //Clean any temporary files

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index 095ef1b..aeaef59 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -29,14 +29,17 @@ import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestMessage;
 import org.apache.asterix.common.messaging.ResourceIdRequestResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
+import org.apache.asterix.common.messaging.api.IApplicationMessage;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.hyracks.api.messages.IMessage;
-import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 
-public class CCMessageBroker implements IMessageBroker {
+public class CCMessageBroker implements ICCMessageBroker {
 
     private final static Logger LOGGER = Logger.getLogger(CCMessageBroker.class.getName());
     private final AtomicLong globalResourceId = new AtomicLong(0);
@@ -58,6 +61,12 @@ public class CCMessageBroker implements IMessageBroker {
             case REPORT_MAX_RESOURCE_ID_RESPONSE:
                 handleReportResourceMaxIdResponse(message, nodeId);
                 break;
+            case TAKEOVER_PARTITIONS_RESPONSE:
+                handleTakeoverPartitionsResponse(message);
+                break;
+            case TAKEOVER_METADATA_NODE_RESPONSE:
+                handleTakeoverMetadataNodeResponse(message);
+                break;
             default:
                 LOGGER.warning("Unknown message: " + absMessage.getMessageType());
                 break;
@@ -89,7 +98,8 @@ public class CCMessageBroker implements IMessageBroker {
         nodesReportedMaxResourceId.add(nodeId);
     }
 
-    private void sendApplicationMessageToNC(IMessage msg, String nodeId) throws Exception {
+    @Override
+    public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception {
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         NodeControllerState state = nodeMap.get(nodeId);
         state.getNodeController().sendApplicationMessageToNC(JavaSerializationUtils.serialize(msg), null, nodeId);
@@ -106,4 +116,14 @@ public class CCMessageBroker implements IMessageBroker {
             }
         }
     }
+
+    private void handleTakeoverPartitionsResponse(IMessage message) {
+        TakeoverPartitionsResponseMessage msg = (TakeoverPartitionsResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processPartitionTakeoverResponse(msg);
+    }
+
+    private void handleTakeoverMetadataNodeResponse(IMessage message) {
+        TakeoverMetadataNodeResponseMessage msg = (TakeoverMetadataNodeResponseMessage) message;
+        AsterixClusterProperties.INSTANCE.processMetadataNodeTakeoverResponse(msg);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 001771e..8f8723e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -25,9 +25,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage;
+import org.apache.asterix.common.messaging.TakeoverMetadataNodeResponseMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsRequestMessage;
+import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessage;
 import org.apache.asterix.common.messaging.api.IApplicationMessageCallback;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
+import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -75,11 +79,40 @@ public class NCMessageBroker implements INCMessageBroker {
             case REPORT_MAX_RESOURCE_ID_REQUEST:
                 reportMaxResourceId();
                 break;
+            case TAKEOVER_PARTITIONS_REQUEST:
+                handleTakeoverPartitons(message);
+                break;
+            case TAKEOVER_METADATA_NODE_REQUEST:
+                handleTakeoverMetadataNode(message);
+                break;
             default:
                 break;
         }
     }
 
+    private void handleTakeoverPartitons(IMessage message) throws Exception {
+        TakeoverPartitionsRequestMessage msg = (TakeoverPartitionsRequestMessage) message;
+        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+                .getApplicationObject();
+        IRemoteRecoveryManager remoteRecoeryManager = appContext.getRemoteRecoveryManager();
+        remoteRecoeryManager.takeoverPartitons(msg.getFailedNode(), msg.getPartitions());
+        //send response after takeover is completed
+        TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
+                appContext.getTransactionSubsystem().getId(), msg.getPartitions());
+        sendMessage(reponse, null);
+    }
+
+    private void handleTakeoverMetadataNode(IMessage message) throws Exception {
+        IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()
+                .getApplicationObject();
+        appContext.initializeMetadata(false);
+        appContext.exportMetadataNodeStub();
+        //send response after takeover is completed
+        TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
+                appContext.getTransactionSubsystem().getId());
+        sendMessage(reponse, null);
+    }
+
     @Override
     public void reportMaxResourceId() throws Exception {
         IAsterixAppRuntimeContext appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext()

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
index 3386252..975180b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/api/IAsterixAppRuntimeContext.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.common.api;
 
 import java.io.IOException;
+import java.rmi.RemoteException;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -89,4 +90,17 @@ public interface IAsterixAppRuntimeContext {
     public IReplicationChannel getReplicationChannel();
 
     public void initializeResourceIdFactory() throws HyracksDataException;
+
+    /**
+     * Exports the metadata node to the metadata RMI port.
+     * @throws RemoteException
+     */
+    public void exportMetadataNodeStub() throws RemoteException;
+
+    /**
+     * Initializes the metadata node and bootstraps the metadata.
+     * @param newUniverse
+     * @throws Exception
+     */
+    public void initializeMetadata(boolean newUniverse) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
new file mode 100644
index 0000000..48b1e73
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/cluster/IGlobalRecoveryMaanger.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.cluster;
+
+import org.apache.asterix.common.api.IClusterEventsSubscriber;
+
+public interface IGlobalRecoveryMaanger extends IClusterEventsSubscriber {
+
+    /**
+     * Starts the global recovery process if the cluster state changed to ACTIVE.
+     */
+    public void startGlobalRecovery();
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 8e2c4e7..473a163 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -65,4 +65,8 @@ public class AsterixMetadataProperties extends AbstractAsterixProperties {
     public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
         return accessor.getClusterPartitions();
     }
+
+    public Map<String, String> getTransactionLogDirs() {
+        return accessor.getTransactionLogDirs();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
index 1ef7e3e..fa5b503 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixReplicationProperties.java
@@ -36,7 +36,6 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
     private static int REPLICATION_TIME_OUT_DEFAULT = 15;
 
     private static final String NODE_IP_ADDRESS_DEFAULT = "127.0.0.1";
-    private static final String REPLICATION_STORE_DEFAULT = "asterix-replication";
     private final String NODE_NAME_PREFIX;
     private final Cluster cluster;
 
@@ -102,8 +101,8 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
             }
 
             if (nodeIndex == -1) {
-                LOGGER.log(Level.WARNING, "Could not find node " + getRealCluserNodeID(nodeId)
-                        + " in cluster configurations");
+                LOGGER.log(Level.WARNING,
+                        "Could not find node " + getRealCluserNodeID(nodeId) + " in cluster configurations");
                 return null;
             }
 
@@ -179,13 +178,6 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
         return replicaIds;
     }
 
-    public String getReplicationStore() {
-        if (cluster != null) {
-            return cluster.getDataReplication().getReplicationStore();
-        }
-        return REPLICATION_STORE_DEFAULT;
-    }
-
     public int getReplicationFactor() {
         if (cluster != null) {
             if (cluster.getDataReplication() == null || cluster.getDataReplication().getReplicationFactor() == null) {
@@ -202,5 +194,4 @@ public class AsterixReplicationProperties extends AbstractAsterixProperties {
         }
         return REPLICATION_TIME_OUT_DEFAULT;
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
index 8dc3efe..a5cd72b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/dataflow/IAsterixApplicationContextInfo.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.common.dataflow;
 
+import org.apache.asterix.common.cluster.IGlobalRecoveryMaanger;
 import org.apache.hyracks.api.application.ICCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import org.apache.hyracks.storage.common.IStorageManagerInterface;
@@ -48,4 +49,6 @@ public interface IAsterixApplicationContextInfo {
      * @return ICCApplicationContext implementation instance
      */
     public ICCApplicationContext getCCApplicationContext();
+
+    public IGlobalRecoveryMaanger getGlobalRecoveryManager();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
new file mode 100644
index 0000000..78b86a8
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeRequestMessage.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_REQUEST;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
new file mode 100644
index 0000000..78f7429
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverMetadataNodeResponseMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverMetadataNodeResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+
+    public TakeoverMetadataNodeResponseMessage(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_METADATA_NODE_RESPONSE;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
new file mode 100644
index 0000000..abfa7d2
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsRequestMessage.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsRequestMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String failedNode;
+    private final long requestId;
+    private final String nodeId;
+
+    public TakeoverPartitionsRequestMessage(long requestId, String nodeId, String failedNode,
+            Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.failedNode = failedNode;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_REQUEST;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+
+    public String getFailedNode() {
+        return failedNode;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Request ID: " + requestId);
+        sb.append(" Node ID: " + nodeId);
+        sb.append(" Failed Node: " + failedNode);
+        sb.append(" Partitions: ");
+        for (Integer partitionId : partitions) {
+            sb.append(partitionId + ",");
+        }
+        //remove last comma
+        sb.charAt(sb.length() - 1);
+        return sb.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
new file mode 100644
index 0000000..86eb3cb
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/TakeoverPartitionsResponseMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging;
+
+public class TakeoverPartitionsResponseMessage extends AbstractApplicationMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final Integer[] partitions;
+    private final String nodeId;
+    private final long requestId;
+
+    public TakeoverPartitionsResponseMessage(long requestId, String nodeId, Integer[] partitionsToTakeover) {
+        this.requestId = requestId;
+        this.nodeId = nodeId;
+        this.partitions = partitionsToTakeover;
+    }
+
+    @Override
+    public ApplicationMessageType getMessageType() {
+        return ApplicationMessageType.TAKEOVER_PARTITIONS_RESPONSE;
+    }
+
+    public Integer[] getPartitions() {
+        return partitions;
+    }
+
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public long getRequestId() {
+        return requestId;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 61ab7cd..57a0dae 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -26,7 +26,11 @@ public interface IApplicationMessage extends IMessage {
         RESOURCE_ID_REQUEST,
         RESOURCE_ID_RESPONSE,
         REPORT_MAX_RESOURCE_ID_REQUEST,
-        REPORT_MAX_RESOURCE_ID_RESPONSE
+        REPORT_MAX_RESOURCE_ID_RESPONSE,
+        TAKEOVER_PARTITIONS_REQUEST,
+        TAKEOVER_PARTITIONS_RESPONSE,
+        TAKEOVER_METADATA_NODE_REQUEST,
+        TAKEOVER_METADATA_NODE_RESPONSE
     }
 
     public abstract ApplicationMessageType getMessageType();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
new file mode 100644
index 0000000..7dafbd5
--- /dev/null
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/ICCMessageBroker.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.messaging.api;
+
+import org.apache.hyracks.api.messages.IMessageBroker;
+
+public interface ICCMessageBroker extends IMessageBroker {
+
+    /**
+     * Sends the passed message to the specified {@code nodeId}
+     * @param msg
+     * @param nodeId
+     * @throws Exception
+     */
+    public void sendApplicationMessageToNC(IApplicationMessage msg, String nodeId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
index 63d29a0..ecc9494 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IRemoteRecoveryManager.java
@@ -18,8 +18,24 @@
  */
 package org.apache.asterix.common.replication;
 
+import java.io.IOException;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+
 public interface IRemoteRecoveryManager {
 
+    /**
+     * Attempts to perform the remote recovery process from an active remote replica.
+     */
     public void performRemoteRecovery();
 
+    /**
+     * Performs the partitions takeover process from the {@code failedNode}
+     * @param failedNode
+     * @param partitions
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
index c796f37..f13d300 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicaResourcesManager.java
@@ -22,10 +22,15 @@ import java.util.Set;
 
 public interface IReplicaResourcesManager {
 
-    public String getIndexPath(String backupNodeId, int IODeviceNum, String dataverse, String dataset);
-
-    public String getLocalStorageFolder();
-
+    /**
+     * @param remoteNodes
+     * @return The minimum LSN of all indexes that belong to {@code remoteNodes}.
+     */
     public long getMinRemoteLSN(Set<String> remoteNodes);
 
+    /**
+     * @param partitions
+     * @return the minimum LSN of all indexes that belong to {@code partitions}.
+     */
+    public long getPartitionsMinLSN(Integer[] partitions);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 9ea9957..a2b7a82 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -111,4 +111,14 @@ public interface IRecoveryManager {
      * @throws HyracksDataException
      */
     public long getLocalMinFirstLSN() throws HyracksDataException;
+
+    /**
+     * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN}
+     * @param partitions
+     * @param lowWaterMarkLSN
+     * @param failedNode
+     * @throws IOException
+     * @throws ACIDException
+     */
+    public void replayPartitionsLogs(Integer[] partitions, long lowWaterMarkLSN, String failedNode) throws IOException, ACIDException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index acfb9d5..48e42bd 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -56,6 +56,14 @@ public class StoragePathUtil {
     }
 
     public static String prepareDataverseIndexName(String dataverseName, String datasetName, String idxName) {
-        return dataverseName + File.separator + datasetName + StoragePathUtil.DATASET_INDEX_NAME_SEPARATOR + idxName;
+        return prepareDataverseIndexName(dataverseName, prepareFullIndexName(datasetName, idxName));
+    }
+
+    public static String prepareDataverseIndexName(String dataverseName, String fullIndexName) {
+        return dataverseName + File.separator + fullIndexName;
+    }
+
+    private static String prepareFullIndexName(String datasetName, String idxName) {
+        return (datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-common/src/main/resources/schema/cluster.xsd
----------------------------------------------------------------------
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 872c959..e0605f0 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -47,7 +47,7 @@
 	<xs:element name="enabled" type="xs:boolean" />
 	<xs:element name="replication_port" type="xs:integer" />
 	<xs:element name="replication_factor" type="xs:integer" />
-	<xs:element name="replication_store" type="xs:string" />
+	<xs:element name="auto_failover" type="xs:boolean" />
 	<xs:element name="replication_time_out" type="xs:integer" />
 
 	<!-- definition of complex elements -->
@@ -82,7 +82,7 @@
 				<xs:element ref="cl:enabled" />
 				<xs:element ref="cl:replication_port" />
 				<xs:element ref="cl:replication_factor" />
-				<xs:element ref="cl:replication_store" />
+				<xs:element ref="cl:auto_failover" />
 				<xs:element ref="cl:replication_time_out" />
 			</xs:sequence>
 		</xs:complexType>


[2/4] incubator-asterixdb git commit: Asterix NCs Fault Tolerance

Posted by mh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
deleted file mode 100644
index 8e020eb..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/AsterixReplicationProtocol.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.functions;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-import org.apache.asterix.common.replication.Replica;
-import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.management.NetworkingUtil;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
-import org.apache.asterix.replication.storage.LSMComponentProperties;
-
-public class AsterixReplicationProtocol {
-
-    /**
-     * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
-     */
-    public static final String JOB_COMMIT_ACK = "$";
-
-    public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
-    public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
-
-    /* 
-     * ReplicationRequestType:
-     * REPLICATE_LOG: txn log replication
-     * REPLICATE_FILE: replicate a file(s)
-     * DELETE_FILE: delete a file(s)
-     * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
-     * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
-     * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
-     * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
-     * UPDATE_REPLICA: used to update replica info such as IP Address change.
-     * GOODBYE: used to notify replicas that the replication request has been completed
-     * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
-     * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
-     * ACK: used to notify the requesting replica that the request has been completed successfully
-     * FLUSH_INDEX: request remote replica to flush an LSM component
-     */
-    public enum ReplicationRequestType {
-        REPLICATE_LOG,
-        REPLICATE_FILE,
-        DELETE_FILE,
-        GET_REPLICA_FILES,
-        GET_REPLICA_LOGS,
-        GET_REPLICA_MAX_LSN,
-        GET_REPLICA_MIN_LSN,
-        UPDATE_REPLICA,
-        GOODBYE,
-        REPLICA_EVENT,
-        LSM_COMPONENT_PROPERTIES,
-        ACK,
-        FLUSH_INDEX
-    }
-
-    public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
-        //read request size
-        NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
-        int requestSize = dataBuffer.getInt();
-
-        if (dataBuffer.capacity() < requestSize) {
-            dataBuffer = ByteBuffer.allocate(requestSize);
-        }
-
-        //read request
-        NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize);
-
-        return dataBuffer;
-    }
-
-    public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
-            throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        lsmCompProp.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
-        }
-        buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
-    }
-
-    public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
-            throws IOException {
-        //read replication request type
-        NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
-
-        ReplicationRequestType requestType = AsterixReplicationProtocol.ReplicationRequestType.values()[byteBuffer
-                .getInt()];
-        return requestType;
-    }
-
-    public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return LSMComponentProperties.create(dis);
-    }
-
-    public static ByteBuffer getGoodbyeBuffer() {
-        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
-        bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
-        bb.flip();
-        return bb;
-    }
-
-    public static ByteBuffer getAckBuffer() {
-        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
-        bb.putInt(ReplicationRequestType.ACK.ordinal());
-        bb.flip();
-        return bb;
-    }
-
-    public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
-        requestBuffer.clear();
-        //put request type (4 bytes)
-        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
-        //leave space for log size
-        requestBuffer.position(requestBuffer.position() + Integer.BYTES);
-        int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
-        //put request size (4 bytes)
-        requestBuffer.putInt(4, logSize);
-        requestBuffer.flip();
-    }
-
-    public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
-        requestBuffer.clear();
-        //put request type (4 bytes)
-        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
-        //length of the log
-        requestBuffer.putInt(serializedLog.length);
-        //the log itself
-        requestBuffer.put(serializedLog);
-        requestBuffer.flip();
-    }
-
-    public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, AsterixLSMIndexFileProperties afp,
-            ReplicationRequestType requestType) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        afp.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (requestBuffer.capacity() < requestSize) {
-            requestBuffer = ByteBuffer.allocate(requestSize);
-        } else {
-            requestBuffer.clear();
-        }
-        requestBuffer.putInt(requestType.ordinal());
-        requestBuffer.putInt(oos.size());
-        requestBuffer.put(outputStream.toByteArray());
-        requestBuffer.flip();
-        return requestBuffer;
-    }
-
-    public static AsterixLSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return AsterixLSMIndexFileProperties.create(dis);
-    }
-
-    public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaLogsRequest.create(dis);
-    }
-
-    public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
-            throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (requestBuffer.capacity() < requestSize) {
-            requestBuffer = ByteBuffer.allocate(requestSize);
-        } else {
-            requestBuffer.clear();
-        }
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
-        requestBuffer.putInt(oos.size());
-        requestBuffer.put(outputStream.toByteArray());
-        requestBuffer.flip();
-        return requestBuffer;
-    }
-
-    public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-
-        oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
-        replica.writeFields(oos);
-        oos.close();
-
-        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
-        buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        return buffer;
-    }
-
-    public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        event.serialize(oos);
-        oos.close();
-
-        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
-        buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
-    }
-
-    public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return Replica.create(dis);
-    }
-
-    public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-
-        return ReplicaEvent.create(dis);
-    }
-
-    public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
-        }
-        buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-    }
-
-    public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
-            throws IOException {
-        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        DataOutputStream oos = new DataOutputStream(outputStream);
-        request.serialize(oos);
-        oos.close();
-
-        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
-        if (buffer.capacity() < requestSize) {
-            buffer = ByteBuffer.allocate(requestSize);
-        } else {
-            buffer.clear();
-        }
-        buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
-        buffer.putInt(oos.size());
-        buffer.put(outputStream.toByteArray());
-        buffer.flip();
-        return buffer;
-    }
-
-    public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaFilesRequest.create(dis);
-    }
-
-    public static ReplicaIndexFlushRequest readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
-        DataInputStream dis = new DataInputStream(bais);
-        return ReplicaIndexFlushRequest.create(dis);
-    }
-
-    public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) {
-        requestBuffer.clear();
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal());
-        requestBuffer.flip();
-    }
-
-    public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
-        requestBuffer.clear();
-        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
-        requestBuffer.flip();
-    }
-
-    public static int getJobIdFromLogAckMessage(String msg) {
-        return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
-    }
-
-    public static String getNodeIdFromLogAckMessage(String msg) {
-        return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
-    }
-
-    /**
-     * Sends a goodbye request to a remote replica indicating the end of a replication request.
-     * 
-     * @param socketChannel
-     *            the remote replica socket.
-     * @throws IOException
-     */
-    public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
-        ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
-        NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
-    }
-
-    public static void sendAck(SocketChannel socketChannel) throws IOException {
-        ByteBuffer ackBuffer = AsterixReplicationProtocol.getAckBuffer();
-        NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
new file mode 100644
index 0000000..be8f8e3
--- /dev/null
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/functions/ReplicationProtocol.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.functions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.replication.Replica;
+import org.apache.asterix.common.replication.ReplicaEvent;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.replication.management.NetworkingUtil;
+import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
+
+public class ReplicationProtocol {
+
+    /**
+     * All replication messages start with ReplicationFunctions (4 bytes), then the length of the request in bytes
+     */
+    public static final String JOB_COMMIT_ACK = "$";
+
+    public final static int REPLICATION_REQUEST_TYPE_SIZE = Integer.BYTES;
+    public final static int REPLICATION_REQUEST_HEADER_SIZE = REPLICATION_REQUEST_TYPE_SIZE + Integer.BYTES;
+
+    /* 
+     * ReplicationRequestType:
+     * REPLICATE_LOG: txn log replication
+     * REPLICATE_FILE: replicate a file(s)
+     * DELETE_FILE: delete a file(s)
+     * GET_REPLICA_FILES: used during remote recovery to request lost LSM Components
+     * GET_REPLICA_LOGS: used during remote recovery to request lost txn logs
+     * GET_REPLICA_MAX_LSN: used during remote recovery initialize a log manager LSN
+     * GET_REPLICA_MIN_LSN: used during remote recovery to specify the low water mark per replica
+     * UPDATE_REPLICA: used to update replica info such as IP Address change.
+     * GOODBYE: used to notify replicas that the replication request has been completed
+     * REPLICA_EVENT: used to notify replicas about a remote replica split/merge.
+     * LSM_COMPONENT_PROPERTIES: used to send the properties of an LSM Component before its physical files are sent
+     * ACK: used to notify the requesting replica that the request has been completed successfully
+     * FLUSH_INDEX: request remote replica to flush an LSM component
+     */
+    public enum ReplicationRequestType {
+        REPLICATE_LOG,
+        REPLICATE_FILE,
+        DELETE_FILE,
+        GET_REPLICA_FILES,
+        GET_REPLICA_LOGS,
+        GET_REPLICA_MAX_LSN,
+        GET_REPLICA_MIN_LSN,
+        UPDATE_REPLICA,
+        GOODBYE,
+        REPLICA_EVENT,
+        LSM_COMPONENT_PROPERTIES,
+        ACK,
+        FLUSH_INDEX
+    }
+
+    public static ByteBuffer readRequest(SocketChannel socketChannel, ByteBuffer dataBuffer) throws IOException {
+        //read request size
+        NetworkingUtil.readBytes(socketChannel, dataBuffer, Integer.BYTES);
+        int requestSize = dataBuffer.getInt();
+
+        if (dataBuffer.capacity() < requestSize) {
+            dataBuffer = ByteBuffer.allocate(requestSize);
+        }
+
+        //read request
+        NetworkingUtil.readBytes(socketChannel, dataBuffer, requestSize);
+
+        return dataBuffer;
+    }
+
+    public static ByteBuffer writeLSMComponentPropertiesRequest(LSMComponentProperties lsmCompProp, ByteBuffer buffer)
+            throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        lsmCompProp.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (buffer.capacity() < requestSize) {
+            buffer = ByteBuffer.allocate(requestSize);
+        } else {
+            buffer.clear();
+        }
+        buffer.putInt(ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+        return buffer;
+    }
+
+    public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer)
+            throws IOException {
+        //read replication request type
+        NetworkingUtil.readBytes(socketChannel, byteBuffer, REPLICATION_REQUEST_TYPE_SIZE);
+
+        ReplicationRequestType requestType = ReplicationProtocol.ReplicationRequestType.values()[byteBuffer
+                .getInt()];
+        return requestType;
+    }
+
+    public static LSMComponentProperties readLSMPropertiesRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return LSMComponentProperties.create(dis);
+    }
+
+    public static ByteBuffer getGoodbyeBuffer() {
+        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
+        bb.putInt(ReplicationRequestType.GOODBYE.ordinal());
+        bb.flip();
+        return bb;
+    }
+
+    public static ByteBuffer getAckBuffer() {
+        ByteBuffer bb = ByteBuffer.allocate(REPLICATION_REQUEST_TYPE_SIZE);
+        bb.putInt(ReplicationRequestType.ACK.ordinal());
+        bb.flip();
+        return bb;
+    }
+
+    public static void writeRemoteRecoveryLogRequest(ByteBuffer requestBuffer, ILogRecord logRecord) {
+        requestBuffer.clear();
+        //put request type (4 bytes)
+        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
+        //leave space for log size
+        requestBuffer.position(requestBuffer.position() + Integer.BYTES);
+        int logSize = logRecord.writeRemoteRecoveryLog(requestBuffer);
+        //put request size (4 bytes)
+        requestBuffer.putInt(4, logSize);
+        requestBuffer.flip();
+    }
+
+    public static void writeReplicateLogRequest(ByteBuffer requestBuffer, byte[] serializedLog) {
+        requestBuffer.clear();
+        //put request type (4 bytes)
+        requestBuffer.putInt(ReplicationRequestType.REPLICATE_LOG.ordinal());
+        //length of the log
+        requestBuffer.putInt(serializedLog.length);
+        //the log itself
+        requestBuffer.put(serializedLog);
+        requestBuffer.flip();
+    }
+
+    public static ByteBuffer writeFileReplicationRequest(ByteBuffer requestBuffer, LSMIndexFileProperties afp,
+            ReplicationRequestType requestType) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        afp.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (requestBuffer.capacity() < requestSize) {
+            requestBuffer = ByteBuffer.allocate(requestSize);
+        } else {
+            requestBuffer.clear();
+        }
+        requestBuffer.putInt(requestType.ordinal());
+        requestBuffer.putInt(oos.size());
+        requestBuffer.put(outputStream.toByteArray());
+        requestBuffer.flip();
+        return requestBuffer;
+    }
+
+    public static LSMIndexFileProperties readFileReplicationRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return LSMIndexFileProperties.create(dis);
+    }
+
+    public static ReplicaLogsRequest readReplicaLogsRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return ReplicaLogsRequest.create(dis);
+    }
+
+    public static ByteBuffer writeGetReplicaLogsRequest(ByteBuffer requestBuffer, ReplicaLogsRequest request)
+            throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        request.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (requestBuffer.capacity() < requestSize) {
+            requestBuffer = ByteBuffer.allocate(requestSize);
+        } else {
+            requestBuffer.clear();
+        }
+        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_LOGS.ordinal());
+        requestBuffer.putInt(oos.size());
+        requestBuffer.put(outputStream.toByteArray());
+        requestBuffer.flip();
+        return requestBuffer;
+    }
+
+    public static ByteBuffer writeUpdateReplicaRequest(Replica replica) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+
+        oos.writeInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
+        replica.writeFields(oos);
+        oos.close();
+
+        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+        buffer.putInt(ReplicationRequestType.UPDATE_REPLICA.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        return buffer;
+    }
+
+    public static ByteBuffer writeReplicaEventRequest(ReplicaEvent event) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        event.serialize(oos);
+        oos.close();
+
+        ByteBuffer buffer = ByteBuffer.allocate(REPLICATION_REQUEST_HEADER_SIZE + oos.size());
+        buffer.putInt(ReplicationRequestType.REPLICA_EVENT.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+        return buffer;
+    }
+
+    public static Replica readReplicaUpdateRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return Replica.create(dis);
+    }
+
+    public static ReplicaEvent readReplicaEventRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+
+        return ReplicaEvent.create(dis);
+    }
+
+    public static void writeGetReplicaFilesRequest(ByteBuffer buffer, ReplicaFilesRequest request) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        request.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (buffer.capacity() < requestSize) {
+            buffer = ByteBuffer.allocate(requestSize);
+        } else {
+            buffer.clear();
+        }
+        buffer.putInt(ReplicationRequestType.GET_REPLICA_FILES.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+    }
+
+    public static ByteBuffer writeGetReplicaIndexFlushRequest(ByteBuffer buffer, ReplicaIndexFlushRequest request)
+            throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        DataOutputStream oos = new DataOutputStream(outputStream);
+        request.serialize(oos);
+        oos.close();
+
+        int requestSize = REPLICATION_REQUEST_HEADER_SIZE + oos.size();
+        if (buffer.capacity() < requestSize) {
+            buffer = ByteBuffer.allocate(requestSize);
+        } else {
+            buffer.clear();
+        }
+        buffer.putInt(ReplicationRequestType.FLUSH_INDEX.ordinal());
+        buffer.putInt(oos.size());
+        buffer.put(outputStream.toByteArray());
+        buffer.flip();
+        return buffer;
+    }
+
+    public static ReplicaFilesRequest readReplicaFileRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return ReplicaFilesRequest.create(dis);
+    }
+
+    public static ReplicaIndexFlushRequest readReplicaIndexFlushRequest(ByteBuffer buffer) throws IOException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit());
+        DataInputStream dis = new DataInputStream(bais);
+        return ReplicaIndexFlushRequest.create(dis);
+    }
+
+    public static void writeGetReplicaMaxLSNRequest(ByteBuffer requestBuffer) {
+        requestBuffer.clear();
+        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal());
+        requestBuffer.flip();
+    }
+
+    public static void writeMinLSNRequest(ByteBuffer requestBuffer) {
+        requestBuffer.clear();
+        requestBuffer.putInt(ReplicationRequestType.GET_REPLICA_MIN_LSN.ordinal());
+        requestBuffer.flip();
+    }
+
+    public static int getJobIdFromLogAckMessage(String msg) {
+        return Integer.parseInt(msg.substring((msg.indexOf(JOB_COMMIT_ACK) + 1)));
+    }
+
+    public static String getNodeIdFromLogAckMessage(String msg) {
+        return msg.substring(0, msg.indexOf(JOB_COMMIT_ACK));
+    }
+
+    /**
+     * Sends a goodbye request to a remote replica indicating the end of a replication request.
+     * 
+     * @param socketChannel
+     *            the remote replica socket.
+     * @throws IOException
+     */
+    public static void sendGoodbye(SocketChannel socketChannel) throws IOException {
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
+        NetworkingUtil.transferBufferToChannel(socketChannel, goodbyeBuffer);
+    }
+
+    public static void sendAck(SocketChannel socketChannel) throws IOException {
+        ByteBuffer ackBuffer = ReplicationProtocol.getAckBuffer();
+        NetworkingUtil.transferBufferToChannel(socketChannel, ackBuffer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
index 38be05e..a7cfaec 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/logging/ReplicationLogBuffer.java
@@ -27,7 +27,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 import org.apache.asterix.replication.management.NetworkingUtil;
 import org.apache.asterix.replication.management.ReplicationManager;
 
@@ -53,7 +53,7 @@ public class ReplicationLogBuffer {
     }
 
     public void append(ILogRecord logRecord) {
-        appendBuffer.putInt(AsterixReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
+        appendBuffer.putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
         appendBuffer.putInt(logRecord.getSerializedLogSize());
         appendBuffer.put(logRecord.getSerializedLog());
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
index 633d87a..9915c83 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaEventNotifier.java
@@ -30,7 +30,7 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 
 public class ReplicaEventNotifier implements Runnable {
 
@@ -61,7 +61,7 @@ public class ReplicaEventNotifier implements Runnable {
 
         ByteBuffer buffer = null;
         try {
-            buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+            buffer = ReplicationProtocol.writeReplicaEventRequest(event);
         } catch (IOException e) {
             e.printStackTrace();
         }
@@ -79,7 +79,7 @@ public class ReplicaEventNotifier implements Runnable {
                     //send replica event
                     connection.write(buffer);
                     //send goodbye
-                    connection.write(AsterixReplicationProtocol.getGoodbyeBuffer());
+                    connection.write(ReplicationProtocol.getGoodbyeBuffer());
                     break;
                 } catch (IOException | UnresolvedAddressException e) {
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index 07ed144..0c94c61 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Callable;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
 
 public class ReplicaStateChecker implements Callable<Void> {
 
@@ -61,7 +61,7 @@ public class ReplicaStateChecker implements Callable<Void> {
                 connection = SocketChannel.open();
                 connection.configureBlocking(true);
                 connection.connect(new InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
-                ByteBuffer buffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+                ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
                 connection.write(buffer);
                 replicationManager.updateReplicaState(replica.getId(), ReplicaState.ACTIVE, suspendReplication);
                 return null;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index e6b2ebf..c97fe94 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -42,7 +42,9 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.config.IAsterixPropertiesProvider;
 import org.apache.asterix.common.context.DatasetLifecycleManager.IndexInfo;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
@@ -59,15 +61,15 @@ import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogSource;
 import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.logging.RemoteLogMapping;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
 import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import org.apache.hyracks.api.application.INCApplicationContext;
 import org.apache.hyracks.storage.am.common.api.IMetaDataPageManager;
@@ -210,7 +212,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         public void run() {
             Thread.currentThread().setName("Replication Thread");
             try {
-                ReplicationRequestType replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+                ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
                         inBuffer);
                 while (replicationFunction != ReplicationRequestType.GOODBYE) {
                     switch (replicationFunction) {
@@ -251,7 +253,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                             throw new IllegalStateException("Unknown replication request");
                         }
                     }
-                    replicationFunction = AsterixReplicationProtocol.getRequestType(socketChannel, inBuffer);
+                    replicationFunction = ReplicationProtocol.getRequestType(socketChannel, inBuffer);
                 }
             } catch (Exception e) {
                 e.printStackTrace();
@@ -267,9 +269,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         }
 
         private void handleFlushIndex() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
             //1. read which indexes are requested to be flushed from remote replica
-            ReplicaIndexFlushRequest request = AsterixReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
+            ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(inBuffer);
             Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
 
             //2. check which indexes can be flushed (open indexes) and which cannot be flushed (closed or have empty memory component)
@@ -302,26 +304,25 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             //the remaining indexes in the requested set are those which cannot be flushed.
             //4. respond back to the requester that those indexes cannot be flushed
             ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
-            outBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
+            outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(outBuffer, laggingIndexesResponse);
             NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
         }
 
         private void handleLSMComponentProperties() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            LSMComponentProperties lsmCompProp = AsterixReplicationProtocol.readLSMPropertiesRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(inBuffer);
             //create mask to indicate that this component is not valid yet
             replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp);
             lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp);
         }
 
         private void handleReplicateFile() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            AsterixLSMIndexFileProperties afp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
 
-            String replicaFolderPath = replicaResourcesManager.getIndexPath(afp.getNodeId(), afp.getIoDeviceNum(),
-                    afp.getDataverse(), afp.getIdxName());
-
-            String replicaFilePath = replicaFolderPath + File.separator + afp.getFileName();
+            //get index path
+            String indexPath = replicaResourcesManager.getIndexPath(afp);
+            String replicaFilePath = indexPath + File.separator + afp.getFileName();
 
             //create file
             File destFile = new File(replicaFilePath);
@@ -334,20 +335,20 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 fileChannel.force(true);
 
                 if (afp.requiresAck()) {
-                    AsterixReplicationProtocol.sendAck(socketChannel);
+                    ReplicationProtocol.sendAck(socketChannel);
                 }
                 if (afp.isLSMComponentFile()) {
-                    String compoentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath(), afp.getNodeId());
+                    String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
                     if (afp.getLSNByteOffset() != IMetaDataPageManager.INVALID_LSN_OFFSET) {
-                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(compoentId,
+                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId,
                                 destFile.getAbsolutePath(), afp.getLSNByteOffset());
                         lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
                     } else {
-                        updateLSMComponentRemainingFiles(compoentId);
+                        updateLSMComponentRemainingFiles(componentId);
                     }
                 } else {
                     //index metadata file
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(replicaFolderPath, logManager.getAppendLSN());
+                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
                 }
             }
         }
@@ -370,43 +371,48 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
         }
 
         private void handleGetReplicaFiles() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaFilesRequest request = AsterixReplicationProtocol.readReplicaFileRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(inBuffer);
 
-            AsterixLSMIndexFileProperties fileProperties = new AsterixLSMIndexFileProperties();
+            LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
 
             List<String> filesList;
             Set<String> replicaIds = request.getReplicaIds();
-
+            Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
+                    .getAppContext()).getMetadataProperties().getNodePartitions();
             for (String replicaId : replicaIds) {
-                filesList = replicaResourcesManager.getResourcesForReplica(replicaId);
-
-                //start sending files
-                for (String filePath : filesList) {
-                    try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
-                            FileChannel fileChannel = fromFile.getChannel();) {
-                        long fileSize = fileChannel.size();
-                        fileProperties.initialize(filePath, fileSize, replicaId, false,
-                                IMetaDataPageManager.INVALID_LSN_OFFSET, false);
-                        outBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(outBuffer, fileProperties,
-                                ReplicationRequestType.REPLICATE_FILE);
-
-                        //send file info
-                        NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
-
-                        //transfer file
-                        NetworkingUtil.sendFile(fileChannel, socketChannel);
+                //get replica partitions
+                ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
+                for (ClusterPartition partition : replicaPatitions) {
+                    filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId());
+
+                    //start sending files
+                    for (String filePath : filesList) {
+                        try (RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
+                                FileChannel fileChannel = fromFile.getChannel();) {
+                            long fileSize = fileChannel.size();
+                            fileProperties.initialize(filePath, fileSize, replicaId, false,
+                                    IMetaDataPageManager.INVALID_LSN_OFFSET, false);
+                            outBuffer = ReplicationProtocol.writeFileReplicationRequest(outBuffer,
+                                    fileProperties, ReplicationRequestType.REPLICATE_FILE);
+
+                            //send file info
+                            NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
+
+                            //transfer file
+                            NetworkingUtil.sendFile(fileChannel, socketChannel);
+                        }
                     }
                 }
             }
 
             //send goodbye (end of files)
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         private void handleGetRemoteLogs() throws IOException, ACIDException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaLogsRequest request = AsterixReplicationProtocol.readReplicaLogsRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaLogsRequest request = ReplicationProtocol.readReplicaLogsRequest(inBuffer);
 
             Set<String> replicaIds = request.getReplicaIds();
             long fromLSN = request.getFromLSN();
@@ -433,13 +439,13 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                     if (replicaIds.contains(logRecord.getNodeId()) && logRecord.getLogType() != LogType.FLUSH) {
                         if (logRecord.getSerializedLogSize() > outBuffer.capacity()) {
                             int requestSize = logRecord.getSerializedLogSize()
-                                    + AsterixReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
+                                    + ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE;
                             outBuffer = ByteBuffer.allocate(requestSize);
                         }
 
                         //set log source to REMOTE_RECOVERY to avoid re-logging on the recipient side
                         logRecord.setLogSource(LogSource.REMOTE_RECOVERY);
-                        AsterixReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
+                        ReplicationProtocol.writeRemoteRecoveryLogRequest(outBuffer, logRecord);
                         NetworkingUtil.transferBufferToChannel(socketChannel, outBuffer);
                     }
                     logRecord = logReader.next();
@@ -449,32 +455,32 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             }
 
             //send goodbye (end of logs)
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         private void handleUpdateReplica() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            Replica replica = AsterixReplicationProtocol.readReplicaUpdateRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            Replica replica = ReplicationProtocol.readReplicaUpdateRequest(inBuffer);
             replicationManager.updateReplicaInfo(replica);
         }
 
         private void handleReplicaEvent() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            ReplicaEvent event = AsterixReplicationProtocol.readReplicaEventRequest(inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(inBuffer);
             replicationManager.reportReplicaEvent(event);
         }
 
         private void handleDeleteFile() throws IOException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
-            AsterixLSMIndexFileProperties fileProp = AsterixReplicationProtocol.readFileReplicationRequest(inBuffer);
-            replicaResourcesManager.deleteRemoteFile(fileProp);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
+            LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(inBuffer);
+            replicaResourcesManager.deleteIndexFile(fileProp);
             if (fileProp.requiresAck()) {
-                AsterixReplicationProtocol.sendAck(socketChannel);
+                ReplicationProtocol.sendAck(socketChannel);
             }
         }
 
         private void handleLogReplication() throws IOException, ACIDException {
-            inBuffer = AsterixReplicationProtocol.readRequest(socketChannel, inBuffer);
+            inBuffer = ReplicationProtocol.readRequest(socketChannel, inBuffer);
 
             //Deserialize log
             remoteLog.readRemoteLog(inBuffer, false, localNodeID);
@@ -518,7 +524,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
                 //send ACK to requester
                 try {
                     socketChannel.socket().getOutputStream().write(
-                            (localNodeID + AsterixReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
+                            (localNodeID + ReplicationProtocol.JOB_COMMIT_ACK + logRecord.getJobId() + "\n")
                                     .getBytes());
                     socketChannel.socket().getOutputStream().flush();
                 } catch (IOException e) {
@@ -625,6 +631,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
             }
 
             File destFile = new File(syncTask.getComponentFilePath());
+            //prepare local LSN buffer
             ByteBuffer metadataBuffer = ByteBuffer.allocate(Long.BYTES);
             metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
             metadataBuffer.flip();

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 36e5dff..5c35df4 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -66,17 +66,16 @@ import org.apache.asterix.common.transactions.ILogRecord;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol;
-import org.apache.asterix.replication.functions.AsterixReplicationProtocol.ReplicationRequestType;
+import org.apache.asterix.replication.functions.ReplicationProtocol;
+import org.apache.asterix.replication.functions.ReplicationProtocol.ReplicationRequestType;
 import org.apache.asterix.replication.functions.ReplicaFilesRequest;
 import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
 import org.apache.asterix.replication.functions.ReplicaLogsRequest;
 import org.apache.asterix.replication.logging.ReplicationLogBuffer;
 import org.apache.asterix.replication.logging.ReplicationLogFlusher;
-import org.apache.asterix.replication.storage.AsterixLSMIndexFileProperties;
 import org.apache.asterix.replication.storage.LSMComponentProperties;
+import org.apache.asterix.replication.storage.LSMIndexFileProperties;
 import org.apache.asterix.replication.storage.ReplicaResourcesManager;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.replication.IReplicationJob;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
@@ -129,7 +128,8 @@ public class ReplicationManager implements IReplicationManager {
     private ReplicationLogFlusher txnlogsReplicator;
     private Future<? extends Object> txnLogReplicatorTask;
     private Map<String, SocketChannel> logsReplicaSockets = null;
-
+    //TODO this class needs to be refactored by moving its private classes to separate files
+    //and possibly using MessageBroker to send/receive remote replicas events.
     public ReplicationManager(String nodeId, AsterixReplicationProperties replicationProperties,
             IReplicaResourcesManager remoteResoucesManager, ILogManager logManager,
             IAsterixAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
@@ -255,7 +255,7 @@ public class ReplicationManager implements IReplicationManager {
             throws IOException {
         boolean isLSMComponentFile;
         ByteBuffer responseBuffer = null;
-        AsterixLSMIndexFileProperties asterixFileProperties = new AsterixLSMIndexFileProperties();
+        LSMIndexFileProperties asterixFileProperties = new LSMIndexFileProperties();
         if (requestBuffer == null) {
             requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         }
@@ -277,7 +277,7 @@ public class ReplicationManager implements IReplicationManager {
                         //send LSMComponent properties
                         LSMComponentJob = (ILSMIndexReplicationJob) job;
                         LSMComponentProperties lsmCompProp = new LSMComponentProperties(LSMComponentJob, nodeId);
-                        requestBuffer = AsterixReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
+                        requestBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(lsmCompProp,
                                 requestBuffer);
                         sendRequest(replicasSockets, requestBuffer);
                     }
@@ -310,7 +310,7 @@ public class ReplicationManager implements IReplicationManager {
                                         IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
                             }
 
-                            requestBuffer = AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer,
+                            requestBuffer = ReplicationProtocol.writeFileReplicationRequest(requestBuffer,
                                     asterixFileProperties, ReplicationRequestType.REPLICATE_FILE);
 
                             Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -350,7 +350,7 @@ public class ReplicationManager implements IReplicationManager {
                     remainingFiles--;
                     asterixFileProperties.initialize(filePath, -1, nodeId, isLSMComponentFile,
                             IMetaDataPageManager.INVALID_LSN_OFFSET, remainingFiles == 0);
-                    AsterixReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
+                    ReplicationProtocol.writeFileReplicationRequest(requestBuffer, asterixFileProperties,
                             ReplicationRequestType.DELETE_FILE);
 
                     Iterator<Map.Entry<String, SocketChannel>> iterator = replicasSockets.entrySet().iterator();
@@ -392,13 +392,13 @@ public class ReplicationManager implements IReplicationManager {
     private static ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer responseBuffer)
             throws IOException {
         if (responseBuffer == null) {
-            responseBuffer = ByteBuffer.allocate(AsterixReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
+            responseBuffer = ByteBuffer.allocate(ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE);
         } else {
             responseBuffer.clear();
         }
 
         //read response from remote replicas
-        ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+        ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
                 responseBuffer);
         return responseFunction;
     }
@@ -519,7 +519,7 @@ public class ReplicationManager implements IReplicationManager {
         node.setClusterIp(newAddress);
         Replica replica = new Replica(node);
 
-        ByteBuffer buffer = AsterixReplicationProtocol.writeUpdateReplicaRequest(replica);
+        ByteBuffer buffer = ReplicationProtocol.writeUpdateReplicaRequest(replica);
         Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
         sendRequest(replicaSockets, buffer);
         closeReplicaSockets(replicaSockets);
@@ -537,7 +537,7 @@ public class ReplicationManager implements IReplicationManager {
         node.setClusterIp(NetworkingUtil.getHostAddress(hostIPAddressFirstOctet));
         Replica replica = new Replica(node);
         ReplicaEvent event = new ReplicaEvent(replica, ReplicaEventType.SHUTDOWN);
-        ByteBuffer buffer = AsterixReplicationProtocol.writeReplicaEventRequest(event);
+        ByteBuffer buffer = ReplicationProtocol.writeReplicaEventRequest(event);
         Map<String, SocketChannel> replicaSockets = getActiveRemoteReplicasSockets();
         sendRequest(replicaSockets, buffer);
         closeReplicaSockets(replicaSockets);
@@ -581,7 +581,7 @@ public class ReplicationManager implements IReplicationManager {
      */
     private void closeReplicaSockets(Map<String, SocketChannel> replicaSockets) {
         //send goodbye
-        ByteBuffer goodbyeBuffer = AsterixReplicationProtocol.getGoodbyeBuffer();
+        ByteBuffer goodbyeBuffer = ReplicationProtocol.getGoodbyeBuffer();
         sendRequest(replicaSockets, goodbyeBuffer);
 
         Iterator<Map.Entry<String, SocketChannel>> iterator = replicaSockets.entrySet().iterator();
@@ -910,7 +910,7 @@ public class ReplicationManager implements IReplicationManager {
         ByteBuffer requestBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
         for (String replicaId : replicaIds) {
             //1. identify replica indexes with LSN less than nonSharpCheckpointTargetLSN.
-            HashMap<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
+            Map<Long, String> laggingIndexes = replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(replicaId,
                     nonSharpCheckpointTargetLSN);
 
             if (laggingIndexes.size() > 0) {
@@ -919,7 +919,7 @@ public class ReplicationManager implements IReplicationManager {
                 try (SocketChannel socketChannel = getReplicaSocket(replicaId)) {
                     ReplicaIndexFlushRequest laggingIndexesRequest = new ReplicaIndexFlushRequest(
                             laggingIndexes.keySet());
-                    requestBuffer = AsterixReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
+                    requestBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(requestBuffer,
                             laggingIndexesRequest);
                     NetworkingUtil.transferBufferToChannel(socketChannel, requestBuffer);
 
@@ -927,19 +927,19 @@ public class ReplicationManager implements IReplicationManager {
                     ReplicationRequestType responseFunction = waitForResponse(socketChannel, requestBuffer);
 
                     if (responseFunction == ReplicationRequestType.FLUSH_INDEX) {
-                        requestBuffer = AsterixReplicationProtocol.readRequest(socketChannel, requestBuffer);
+                        requestBuffer = ReplicationProtocol.readRequest(socketChannel, requestBuffer);
                         //returning the indexes that were not flushed
-                        laggingIndexesResponse = AsterixReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
+                        laggingIndexesResponse = ReplicationProtocol.readReplicaIndexFlushRequest(requestBuffer);
                     }
                     //send goodbye
-                    AsterixReplicationProtocol.sendGoodbye(socketChannel);
+                    ReplicationProtocol.sendGoodbye(socketChannel);
                 }
 
                 //4. update the LSN_MAP for indexes that were not flushed to the current append LSN to indicate no operations happend.
                 if (laggingIndexesResponse != null) {
                     for (Long resouceId : laggingIndexesResponse.getLaggingRescouresIds()) {
                         String indexPath = laggingIndexes.get(resouceId);
-                        HashMap<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
+                        Map<Long, Long> indexLSNMap = replicaResourcesManager.getReplicaIndexLSNMap(indexPath);
                         indexLSNMap.put(ReplicaResourcesManager.REPLICA_INDEX_CREATION_LSN, startLSN);
                         replicaResourcesManager.updateReplicaIndexLSNMap(indexPath, indexLSNMap);
                     }
@@ -953,7 +953,7 @@ public class ReplicationManager implements IReplicationManager {
     public long getMaxRemoteLSN(Set<String> remoteReplicas) throws IOException {
         long maxRemoteLSN = 0;
 
-        AsterixReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
+        ReplicationProtocol.writeGetReplicaMaxLSNRequest(dataBuffer);
         Map<String, SocketChannel> replicaSockets = new HashMap<String, SocketChannel>();
         try {
             for (String replicaId : remoteReplicas) {
@@ -988,26 +988,26 @@ public class ReplicationManager implements IReplicationManager {
     @Override
     public void requestReplicaFiles(String selectedReplicaId, Set<String> replicasDataToRecover) throws IOException {
         ReplicaFilesRequest request = new ReplicaFilesRequest(replicasDataToRecover);
-        AsterixReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
+        ReplicationProtocol.writeGetReplicaFilesRequest(dataBuffer, request);
 
         try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId)) {
 
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
 
-            String destFolder;
+            String indexPath;
             String destFilePath;
-
-            ReplicationRequestType responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel,
+            ReplicationRequestType responseFunction = ReplicationProtocol.getRequestType(socketChannel,
                     dataBuffer);
-            AsterixLSMIndexFileProperties fileProperties;
+            LSMIndexFileProperties fileProperties;
             while (responseFunction != ReplicationRequestType.GOODBYE) {
-                dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
+
+                fileProperties = ReplicationProtocol.readFileReplicationRequest(dataBuffer);
 
-                fileProperties = AsterixReplicationProtocol.readFileReplicationRequest(dataBuffer);
-                destFolder = replicaResourcesManager.getIndexPath(fileProperties.getNodeId(),
-                        fileProperties.getIoDeviceNum(), fileProperties.getDataverse(), fileProperties.getIdxName());
-                destFilePath = destFolder + File.separator + fileProperties.getFileName();
+                //get index path
+                indexPath = replicaResourcesManager.getIndexPath(fileProperties);
+                destFilePath = indexPath + File.separator + fileProperties.getFileName();
 
                 //create file
                 File destFile = new File(destFilePath);
@@ -1024,14 +1024,14 @@ public class ReplicationManager implements IReplicationManager {
                 //we need to create LSN map for .metadata files that belong to remote replicas
                 if (!fileProperties.isLSMComponentFile() && !fileProperties.getNodeId().equals(nodeId)) {
                     //replica index
-                    replicaResourcesManager.initializeReplicaIndexLSNMap(destFolder, logManager.getAppendLSN());
+                    replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, logManager.getAppendLSN());
                 }
 
-                responseFunction = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+                responseFunction = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
     }
 
@@ -1039,7 +1039,7 @@ public class ReplicationManager implements IReplicationManager {
     @Override
     public long requestReplicaMinLSN(String selectedReplicaId) throws IOException {
         long minLSN = 0;
-        AsterixReplicationProtocol.writeMinLSNRequest(dataBuffer);
+        ReplicationProtocol.writeMinLSNRequest(dataBuffer);
         try (SocketChannel socketChannel = getReplicaSocket(selectedReplicaId);) {
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
@@ -1049,7 +1049,7 @@ public class ReplicationManager implements IReplicationManager {
             minLSN = dataBuffer.getLong();
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
         }
 
         return minLSN;
@@ -1060,19 +1060,19 @@ public class ReplicationManager implements IReplicationManager {
     public ArrayList<ILogRecord> requestReplicaLogs(String remoteNode, Set<String> nodeIdsToRecoverFor, long fromLSN)
             throws IOException, ACIDException {
         ReplicaLogsRequest request = new ReplicaLogsRequest(nodeIdsToRecoverFor, fromLSN);
-        dataBuffer = AsterixReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
+        dataBuffer = ReplicationProtocol.writeGetReplicaLogsRequest(dataBuffer, request);
 
         try (SocketChannel socketChannel = getReplicaSocket(remoteNode)) {
             //transfer request
             NetworkingUtil.transferBufferToChannel(socketChannel, dataBuffer);
 
             //read response type
-            ReplicationRequestType responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+            ReplicationRequestType responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
 
             ArrayList<ILogRecord> recoveryLogs = new ArrayList<ILogRecord>();
             ILogRecord logRecord = new LogRecord();
             while (responseType != ReplicationRequestType.GOODBYE) {
-                dataBuffer = AsterixReplicationProtocol.readRequest(socketChannel, dataBuffer);
+                dataBuffer = ReplicationProtocol.readRequest(socketChannel, dataBuffer);
                 logRecord.readRemoteLog(dataBuffer, true, nodeId);
 
                 if (logRecord.getNodeId().equals(nodeId)) {
@@ -1085,11 +1085,11 @@ public class ReplicationManager implements IReplicationManager {
                     logManager.log(logRecord);
                 }
 
-                responseType = AsterixReplicationProtocol.getRequestType(socketChannel, dataBuffer);
+                responseType = ReplicationProtocol.getRequestType(socketChannel, dataBuffer);
             }
 
             //send goodbye
-            AsterixReplicationProtocol.sendGoodbye(socketChannel);
+            ReplicationProtocol.sendGoodbye(socketChannel);
             return recoveryLogs;
         }
     }
@@ -1136,11 +1136,7 @@ public class ReplicationManager implements IReplicationManager {
             updateReplicaState(replicaId, ReplicaState.DEAD, true);
 
             //delete any invalid LSMComponents for this replica
-            try {
-                replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
-            } catch (HyracksDataException e) {
-                e.printStackTrace();
-            }
+            replicaResourcesManager.cleanInvalidLSMComponents(replicaId);
         }
 
         public void handleShutdownEvent(String replicaId) {
@@ -1237,8 +1233,8 @@ public class ReplicationManager implements IReplicationManager {
                         break;
                     }
                     //read ACK for job commit log
-                    String replicaId = AsterixReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
-                    int jobId = AsterixReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
+                    String replicaId = ReplicationProtocol.getNodeIdFromLogAckMessage(responseLine);
+                    int jobId = ReplicationProtocol.getJobIdFromLogAckMessage(responseLine);
                     addAckToJob(jobId, replicaId);
                 }
             } catch (AsynchronousCloseException e1) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index a82b535..ee987f8 100644
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ b/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.replication.recovery;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,10 +31,12 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.config.AsterixReplicationProperties;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.transactions.ILogManager;
 import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 
 public class RemoteRecoveryManager implements IRemoteRecoveryManager {
@@ -55,13 +58,20 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
     @Override
     public void performRemoteRecovery() {
+        //TODO this method needs to be adapted to perform failback when autoFailover is enabled.
+        //Currently we will not allow a node to perform remote recovery since another replica
+        //already tookover its workload and might not resync correctly if there are on on-going
+        //jobs on the replica.
+        if (AsterixClusterProperties.INSTANCE.isAutoFailoverEnabled()) {
+            throw new IllegalStateException("Cannot perform remote recovery when auto failover is enabled.");
+        }
         //The whole remote recovery process should be atomic.
         //Any error happens, we should start the recovery from the start until the recovery is complete or an illegal state is reached (cannot recovery).
         int maxRecoveryAttempts = 10;
         PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) runtimeContext
                 .getLocalResourceRepository();
         while (true) {
-            //start recovery recovery steps
+            //start recovery steps
             try {
                 maxRecoveryAttempts--;
 
@@ -76,7 +86,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                 int activeReplicasCount = replicationManager.getActiveReplicasCount();
 
                 if (activeReplicasCount == 0) {
-                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to performe remote recovery");
+                    throw new IllegalStateException("no ACTIVE remote replica(s) exists to perform remote recovery");
                 }
 
                 //2. clean any memory data that could've existed from previous failed recovery attempt
@@ -90,8 +100,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                 Map<String, Set<String>> selectedRemoteReplicas = constructRemoteRecoveryPlan();
 
                 //5. get max LSN from selected remote replicas
-                long maxRemoteLSN = 0;
-                maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
+                long maxRemoteLSN = replicationManager.getMaxRemoteLSN(selectedRemoteReplicas.keySet());
 
                 //6. force LogManager to start from a partition > maxLSN in selected remote replicas
                 logManager.renewLogFilesAndStartFromLSN(maxRemoteLSN);
@@ -107,8 +116,7 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
                     //2. Initialize local resources based on the newly received files (if we are recovering the primary replica on this node)
                     if (replicasDataToRecover.contains(logManager.getNodeId())) {
                         ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
-                                .initializeNewUniverse(
-                                        runtimeContext.getReplicaResourcesManager().getLocalStorageFolder());
+                                .initializeNewUniverse(AsterixClusterProperties.INSTANCE.getStorageDirectoryName());
                         //initialize resource id factor to correct max resource id
                         runtimeContext.initializeResourceIdFactory();
                     }
@@ -140,7 +148,6 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
     }
 
     private Map<String, Set<String>> constructRemoteRecoveryPlan() {
-
         //1. identify which replicas reside in this node
         String localNodeId = logManager.getNodeId();
         Set<String> nodes = replicationProperties.getNodeReplicasIds(localNodeId);
@@ -205,4 +212,14 @@ public class RemoteRecoveryManager implements IRemoteRecoveryManager {
 
         return recoveryList;
     }
+
+    @Override
+    public void takeoverPartitons(String failedNode, Integer[] partitions) throws IOException, ACIDException {
+        long minLSN = runtimeContext.getReplicaResourcesManager().getPartitionsMinLSN(partitions);
+        //reply logs > minLSN that belong to these partitions
+        //TODO (mhubail) currently we assume the logs for these partitions belong to the failed node
+        //this needs to be updated once log formats are updated to include the partition id
+        runtimeContext.getTransactionSubsystem().getRecoveryManager().replayPartitionsLogs(partitions, minLSN,
+                failedNode);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/8fc8bf8b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
----------------------------------------------------------------------
diff --git a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java b/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
deleted file mode 100644
index 67b39c4..0000000
--- a/asterix-replication/src/main/java/org/apache/asterix/replication/storage/AsterixFilesUtil.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.replication.storage;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
-
-public class AsterixFilesUtil {
-
-    public static void deleteFolder(String folderPath) throws IOException {
-        File folder = new File(folderPath);
-        if (folder.exists()) {
-            //delete files inside the folder
-            while (deleteDirecotryFiles(folderPath) != true) {
-                //if there is a file being written (e.g. LSM Component), wait and try again to delete the file
-                try {
-                    Thread.sleep(500);
-                } catch (InterruptedException e) {
-                    //ignore
-                }
-            }
-
-            //delete the folder itself
-            folder.delete();
-        }
-    }
-
-    private static boolean deleteDirecotryFiles(String dirPath) throws IOException {
-        try {
-            Path directory = Paths.get(dirPath);
-            Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
-                @Override
-                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
-                    Files.delete(file);
-                    return FileVisitResult.CONTINUE;
-                }
-
-                @Override
-                public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
-                    Files.delete(dir);
-                    return FileVisitResult.CONTINUE;
-                }
-
-                @Override
-                public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
-                    return FileVisitResult.CONTINUE;
-                }
-
-            });
-            return true;
-        } catch (Exception e) {
-            e.printStackTrace();
-            return false;
-        }
-    }
-}